MapReduce

Java
260
0
0
2024-02-26

MapReduce定义

常用数据序列化类型

Java类型

Hadoop Writable类型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

String

Text

map

MapWritable

array

ArrayWritable

MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

1、Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

(3)Mapper中的业务逻辑写在map()方法中

(4)Map的输出数据是KV对的形式(KV的类型可自定义)

(5)map()方法(MapTask进程)对每一个<K,V>调用一次

2、Reducer阶段

(1)用户自定义的Reduce要继自己的父类

(2) Reducer的输入 数据类型对应Mappe的输出数据类型,也是KV

(3)Reducer的业务逻捐写在reduce()方法中

(4) ReduceTask进程对每-组相同k的<K,V>组调用一次reduce()方法

3、Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

WordCount案例实操

(1)需求

在给定的文本文件中统计输出每一个单词出现的总次数

(2)期望输出数据

banzhang 1

cls 2

hadoop 1

jiao 1

ss 2

xue 1

(3)项目实操

pom.xml
<dependencies>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.7.2</version>
		</dependency>
	</dependencies>
log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
WordCountMapper
package com.imooc.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//map阶段

//第一个泛型KEYIN 输入数据的key 
//第二个泛型VALUEIN 输入数据的value
//第三个泛型KEYOUT 输出数据的key的类型   sa,1   ss,1
//第四个泛型VALUEOUT 输出的数据的value类型
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		//获取一行
		String line=value.toString();
		//切分
		String[] words=line.split(" ");
		//循环
		for (String string : words) {
			//SS
			Text k=new Text();
			k.set(string);
			//1
			IntWritable v=new IntWritable();
			v.set(1);
			context.write(k, v);
		}
		
	}
}
WordCountReducer
package com.imooc.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//KETIN KEYOUT map阶段输出的key和value
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		int sum=0;
		
		//累加求和
		for (IntWritable value : values) {
			sum+=value.get();
		}
		
		IntWritable v=new IntWritable();
		v.set(sum);
		context.write(key, v);

	}

}
WordcountDriver
package com.imooc.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {

	public static void main(String[] args) throws Exception {
		
		//输入路径(处理E:\temp\input下的***文件)
		String inputPath="E:\\temp\\input";
		//输出路径(output文件夹不能存在,否则报错)
		String outputPath="E:\\temp\\output";
		
		Configuration conf = new Configuration();

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置
		job.setJarByClass(WordcountDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 设置最终数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));

		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);
		
		System.out.println("-------OVER-----------");
	}

}

Hadoop序列化

为什么不用java的序列化?

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header传输。所以,Hadoop自己开发了一套序列化机制(Writable).

Hadoop序列化特点:

1)紧凑:高效实用存储空间

2)快速:读写数据的额外开销小

3)可扩展:随着通信协议的升级而升级

4)互操作:支持多语言的交互

自定义bean对象实现序列化接口(Writable)

自定义对象序列化步骤:

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

序列化案例实操

需求

统计每一个手机号耗费的总上行流量、下行流量、总流量

输入数据

期望输出数据

13560436666 1116 954 2070

手机号码 上行流量 下行流量 总流量

编写MapReduce程序

实例化Bean对象
package com.imooc.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//步骤1 实现Weitable接口
public class FlowBean implements Writable {

	private long upFlow;// 上行流量
	private long downFlow;// 下行流量
	private long sumFlow;// 总流量

	// 步骤2:无参构造
	public FlowBean() {
	}

	// 有参构造
	public FlowBean(long upFlow, long downFlow) {
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}

	// 步骤3:序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);

	}

	// 步骤4:反序列化方法
	// 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
	@Override
	public void readFields(DataInput in) throws IOException {
		//必须与序列化中顺序一致
		this.upFlow = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();

	}

	// 步骤6:为后续方便,重新toString方法
	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	

}
Mapper
package com.imooc.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
	

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		 // 1 获取一行
		 String line = value.toString();
		 // 2 切割字段
		 String[] words = line.split("\t");
		 // 3 封装对象
		 Text k = new Text();
		 k.set(words[1]);//取出手机号码
		
		 // 取出上行流量和下行流量
		 long upFlow = Long.parseLong(words[words.length - 3]);
		 long downFlow = Long.parseLong(words[words.length - 2]);
		
		 FlowBean v = new FlowBean(upFlow, downFlow);
		 System.out.println("map输出的参数:"+v);
		 // 4 写出
		 context.write(k, v);


	}

}
Reducer
package com.imooc.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Context context)
			throws IOException, InterruptedException {

		long sum_up = 0;
		long sum_dowm = 0;

		for (FlowBean flowBean : values) {
			System.out.println("reduce输入的参数:"+flowBean);
			sum_up += flowBean.getUpFlow();
			sum_dowm += flowBean.getDownFlow();

		}

		FlowBean temp = new FlowBean(sum_up, sum_dowm);
		System.out.println("reduce输出的参数:"+temp);
		context.write(key, temp);
	}

}
Driver
package com.imooc.flowsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class FlowBeanDriver {

	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置(当前类.class)
		job.setJarByClass(FlowBeanDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 5 设置最终数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");

	}

}

FileInputFormat实现类

FileInoutFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。

TextInputFormat

TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行符和回车符),Text类型。

一下是一个实例,比如一个分片包含一下四条记录

每条记录表示为以下键值对

KeyValueTextInputFormat

每一行均为一条记录,被分隔符分为key,value.可以通过咋驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");来设定分隔符。默认分隔符为tab(\t)。

以下是一个实例,输入的是一个包含四条记录的分片。其中--->表示一个(水平方向的)制表符

每条记录表示为一下键/值对

KeyValueTextInputFormat实操

1需求:

统计输入文件中每一行的第一个单词相同的行数。

输入数据:

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

期望输出数据:

banzhang	2
xihuan	2

2注意事项

// 设置切割符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
// 设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);

3代码实现

Mapper

package com.imooc.keyvaluetext;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {

	protected void map(Text key, Text value, Context context) throws java.io.IOException, InterruptedException {

		IntWritable v = new IntWritable(1);

		context.write(key, v);

	};

}

Reducer

package com.imooc.keyvaluetext;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context)
			throws java.io.IOException, InterruptedException {

		int sum = 0;

		for (IntWritable value : values) {

			sum++;

		}
		
		context.write(key, new IntWritable(sum));

	};

}

Driver

package com.imooc.keyvaluetext;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KVTextDriver {
	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();
		// 设置切割符(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)
		conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置
		job.setJarByClass(KVTextDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(KVTextMapper.class);
		job.setReducerClass(KVTextReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 设置最终(reducer)数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// 设置输入格式(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));

		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");
	}

}

NLineInputFormat

如果使用NLineInputFormat,代表每个进程处理的InputSpit不再按Block块去划分,而是按照NLineInputFormat指定的行数N进行划分。即输入文件的总行数/N=切片数,如果不整切,切片数=商+1

Key/value和TextInputFormat一样

原始数据

N=2切片结果

N=3切片结果

代码注意事项

//设置每个切片InputSplit中划分三条记录 NLineInputFormat.setNumLinesPerSplit(job, 3);   
//使用NLineInputFormat处理记录数  
job.setInputFormatClass(NLineInputFormat.class);  

自定义InputFormat

自定义InputFormat步骤如下:
  1. 自定义一个类继承FileInputFormat
  2. 改写RecordReader,实现一次读取一个完整文件封装为KV。
  3. 在输出时使用SequenceFileOutPutFormat输出合并文件。

Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

Partition分区

问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同的省份输入到不同的文件中(分区)

默认分区

默认分区是根据key的hashcode对ReduceTasks个数取模得到的。用户没发控制哪个key存储到哪个分区。

自定义分区步骤

  • 自定义继承Partitioner,重写getPartition()方法。
  • 在job驱动中,设置自定义的Partitioner。
//  指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
  • 自定义PPartition后,要根据自定义Partitioner的逻辑设置响应数量的ReduceTask。
//  同时指定相应数量的reduce task
job.setNumReduceTasks(5);

自定义分区实操(在上面的序列化案例实操项目的基础上进行实操)

需求:

将手机号开头为136、137、138、139和其他的分别放置5个文件中

输入数据:
1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2	13846544121	192.196.100.2			264	0	200
3 	13956435636	192.196.100.3			132	1512	200
4 	13966251146	192.168.100.1			240	0	404
5 	18271575951	192.168.100.2	www.atguigu.com	1527	2106	200
6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
7 	13590439668	192.168.100.4			1116	954	200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			240	0	200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.atguigu.com	1938	180	500
13 	13560439638	192.168.100.10			918	4938	200
14 	13470253144	192.168.100.11			180	180	200
15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			120	120	200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			1116	954	200
期望输出结果:

将手机号开头为136、137、138、139和其他的分别放置5个文件中

136

分区0

137

分区1

138

分区2

139

分区3

其他

分区4

自定义partition类
package com.imooc;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import com.imooc.flowsumpartition.FlowBean;

//自定义partition
public class ProvincePartitioner extends Partitioner<Text, com.imooc.flowsumpartition.FlowBean> {

	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		// 1 获取电话号码的前三位
		String preNum = key.toString().substring(0, 3);

		int partition = 4;

		// 2 判断是哪个省
		if ("136".equals(preNum)) {
			partition = 0;
		} else if ("137".equals(preNum)) {
			partition = 1;
		} else if ("138".equals(preNum)) {
			partition = 2;
		} else if ("139".equals(preNum)) {
			partition = 3;
		}

		return partition;
	}

}
Driver驱动类
package com.imooc.flowsumpartition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.imooc.ProvincePartitioner;

public class FlowBeanDriver {

	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置(当前类.class)
		job.setJarByClass(FlowBeanDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 5 设置最终数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
		job.setPartitionerClass(ProvincePartitioner.class);
		// 同时指定相应数量的reduce
		// task(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
		job.setNumReduceTasks(5);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");

	}

}

分区总结

  1. 如果ReduceTask的数据>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
  2. 如果ReduceTask的数据<getPartition的结果数,则会Exception;
  3. 如果ReduceTask的数据==1,分区效果不起作用,最终也就只会产生一个结果part-r-00000;

WritableComparable排序(全排序)(全排序)(全排序)

排序分类

  • 部分排序

MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。

  • 全排序

最终输出的结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所以文件,完全丧失了MapReduce所提供的并行架构。

  • 辅助排序(GroupingComparator分组):

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或者几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序

  • 二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序

WritableComparable排序案例实操

需求

将数据按照总流量(上行流量+下行流量)从大到小排序

输入数据

手机号 上行流量 下行流量 总流量

13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
。。。
预期结果(按照最后一列排序)
13509468723	7335	110349	117684
13736230513	2481	24681	27162
13956435636	132	1512    1644
13846544121	264	0	264
实体类实现WritableComparable接口
package com.imooc.compareto;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

//步骤1 实现WritableComparable接口
public class FlowBean implements WritableComparable<FlowBean> {

	private long upFlow;// 上行流量
	private long downFlow;// 下行流量
	private long sumFlow;// 总流量

	// 步骤2:无参构造
	public FlowBean() {
	}

	// 有参构造
	public FlowBean(long upFlow, long downFlow) {
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}

	// 步骤3:序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);

	}

	// 步骤4:反序列化方法
	// 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
	@Override
	public void readFields(DataInput in) throws IOException {
		// 必须与序列化中顺序一致
		this.upFlow = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();

	}

	 步骤5:比较
	@Override
	public int compareTo(FlowBean bean) {
		int result;
		// 按照总流量大小,倒序排列
		if (sumFlow > bean.getSumFlow()) {
			result = -1;
		} else if (sumFlow < bean.getSumFlow()) {
			result = 1;
		} else {
			result = 0;
		}
		return result;
	}

	// 步骤6:为后续方便,重新toString方法
	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

}
Mapper
package com.imooc.compareto;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowBeanMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
	

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		 // 1 获取一行
		 String line = value.toString();
		 // 2 切割字段
		 String[] words = line.split("\t");
		 // 3 封装对象
		 Text v = new Text();
		 v.set(words[0]);//取出手机号码
		
		 // 取出上行流量和下行流量
		 long upFlow = Long.parseLong(words[1]);
		 long downFlow = Long.parseLong(words[2]);
		 long sumFlow=Long.parseLong(words[3]);
		
		 FlowBean k = new FlowBean(upFlow, downFlow);
		 // 4 写出
		 context.write(k, v);


	}

}
Reducer
package com.imooc.compareto;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowBeanReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

	@Override
	protected void reduce(FlowBean key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		
		
		for (Text text : values) {
			
			context.write(text, key);
			
		}
		
	}

}
Driver
package com.imooc.compareto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class FlowBeanDriver {

	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置(当前类.class)
		job.setJarByClass(FlowBeanDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		// 5 设置最终数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");

	}

}

WritableComparable排序(分区排序)(分区排序)(分区排序)

需求

要求每个省份(136、137、148、139、其他)手机号输出的文件中按照总流量内部排序。

需求分析:基于前一个需求WritableComparableq排序(全排序),增加自定义分区类,分区按照省份手机号设置。
自定义分区
package com.imooc.compareto_partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

//泛型为map的输出类型,即reduce的输入类型
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

	@Override
	public int getPartition(FlowBean key, Text value, int numPartitions) {
		// 1 获取电话号码的前三位
		String preNum = value.toString().substring(0, 3);

		int partition = 4;

		// 2 判断是哪个省
		if ("136".equals(preNum)) {
			partition = 0;
		} else if ("137".equals(preNum)) {
			partition = 1;
		} else if ("138".equals(preNum)) {
			partition = 2;
		} else if ("139".equals(preNum)) {
			partition = 3;
		}

		return partition;

	}

}
Driver启动类

核心代码

// 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应数量的reducetask(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
job.setNumReduceTasks(5);
package com.imooc.compareto_partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowBeanDriver {

	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置(当前类.class)
		job.setJarByClass(FlowBeanDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		// 5 设置最终数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
		job.setPartitionerClass(ProvincePartitioner.class);
		// 同时指定相应数量的reduce
		// task(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
		job.setNumReduceTasks(5);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");

	}

}

Combiner合并

简介

  1. Combiner是MR程序中Mapper和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer
  3. Combiner和Reducer的区别在于运行的位置(Combiner是在每一个MapTask所在的节点运行)(Reducer是在接收全局所有Mapper的输出结果)
  4. Combiner的意义就是对每一个MapTask的输出进行局部徽章,以减少网络传输量。
  5. Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出KV应该和Reducer的输入KV类型对应起来

GroupingComparator分组(辅助排序)

在reduce端对key进行分组,应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序

分组排序步骤:

  • (1)自定义类继承WritableComparator
  • (2)重写compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
  • (3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}

(4)在驱动里设置关联

// 8 设置reduce端的分组
job.setGroupingComparatorClass(OrderGroupingComparator.class);

GroupingComparator分组实操:

需求:统计一个订单里价格最高的数据

我认为,GroupingComparator所解决的问题是:当一个entity里有多个属性(id,price,age'等等)需要排序时候,需要将entity作为作为mapper的输出K,这样才能排序,但是因为(id,price,age)只要有一个不同,他们就不同,所以每一个k(entity)都是不同的,而我又想将id相同的但是实际不同的K伪装成相同的放在一个reduce中

比如:

(id,price) A(1,11) B(1,33) C(2,33) 三个数都作为map的输出K是不相同(假设我实体类的compare方法中先按id排序,在按price排序)的,所以要进入3个reduce中,如下图

但是我的需求是id相同的进入一个reduce中,所以出现了GroupingComparator

在通俗一点:在reduce端对key进行分组,应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序

输入数据:

订单号 商品号 价格

0000001	Pdt_01	222.8
0000002	Pdt_05	722.4
0000001	Pdt_02	33.8
0000003	Pdt_06	232.8
0000003	Pdt_02	33.8
0000002	Pdt_03	522.8
0000002	Pdt_04	122.4
期望输出结果:
1	222.8
2	722.4
3	232.8
需求分析:
  1. 利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
  2. 在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,

封装的实体类(实现WritableComparable接口)

package com.imooc.GroupingComparator;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

	private int order_id;
	private double price;

	@Override
	public void readFields(DataInput in) throws IOException {
		this.order_id = in.readInt();
		this.price = in.readDouble();

	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(order_id);
		out.writeDouble(price);

	}

	// 二次排序
	@Override
	public int compareTo(OrderBean o) {

		int result;
        //先按id升序,在按价格降序
		if (order_id > o.getOrder_id()) {
			result = 1;
		} else if (order_id < o.getOrder_id()) {
			result = -1;
		} else {
			// 价格倒序排序
			result = price > o.getPrice() ? -1 : 1;
		}

		return result;
	}

	public OrderBean() {
	}

	public int getOrder_id() {
		return order_id;
	}

	public void setOrder_id(int order_id) {
		this.order_id = order_id;
	}

	public double getPrice() {
		return price;
	}

	public void setPrice(double price) {
		this.price = price;
	}

	@Override
	public String toString() {
		return order_id + "\t" + price;
	}

}

Mapper

package com.imooc.GroupingComparator;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 1 获取一行
		String line = value.toString();

		// 2 截取
		String[] fields = line.split("\t");

		// 3 封装对象
		OrderBean k = new OrderBean();
		k.setOrder_id(Integer.parseInt(fields[0]));
		k.setPrice(Double.parseDouble(fields[2]));

		// 4 写出
		context.write(k, NullWritable.get());
	}

}

Reducer

package com.imooc.GroupingComparator;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

	@Override
	protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {
		
		context.write(key, NullWritable.get());

	}

}

继承GroupingComparator的类

package com.imooc.GroupingComparator;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

	protected OrderGroupingComparator() {
		super(OrderBean.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {

		OrderBean aBean = (OrderBean) a;
		OrderBean bBean = (OrderBean) b;

		int result;
		if (aBean.getOrder_id() > bBean.getOrder_id()) {
			result = 1;
		} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
			result = -1;
		} else {
			result = 0;
		}

		return result;
	}
}

Driver驱动类

package com.imooc.GroupingComparator;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {

	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置(当前类.class)
		job.setJarByClass(OrderDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(OrderMapper.class);
		job.setReducerClass(OrderReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(OrderBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// 5 设置最终数据输出的key和value类型
		job.setOutputKeyClass(OrderBean.class);
		job.setOutputValueClass(NullWritable.class);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));
		
		// 8 设置reduce端的分组
		job.setGroupingComparatorClass(OrderGroupingComparator.class);

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");

	}

}

OutputFormat数据输出

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有 实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

  • 1.文本输出TextOutputFormat

默认的输出格式是TextOutputFormat ,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。

  • 2. SequenceFileOutputF ormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很客易被压缩。

  • 3.自定义OutputFormat

根据用户需求,自定义实现输出。

自定义OutputFormat步骤

  • 自定义一个类继承FileOutputFormat
  • 改写recordWriter,具体改写输出数据的方法write()
  • job.setOutputFormatClass(FilterOutputFormat.class);

自定义OutputFormat案例实操

需求:

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

输入数据:
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
期望输出数据:

Mapper

单纯的切割和写

package com.imooc.myoutputformat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		
		String line = value.toString();
		
		Text k=new Text(line);
		
		context.write(k, NullWritable.get());

	}

}
Reducer

单纯的写,但是要改一下格式,方便在输出文件里看,否则连在一起在一样

package com.imooc.myoutputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {

		// 1 获取一行
		String line = key.toString();

		// 2 拼接
		line = line + "\r\n";

		// 3 设置key
		Text k=new Text();
		k.set(line);

		for (NullWritable nullWritable : values) {

			context.write(k, NullWritable.get());
		}

	}

}
FRecordWriter

继承RecordWriter

初始化数据流并且完整业务逻辑

而且在这个类中还可以写入MySQL,Redis等数据库中

package com.imooc.myoutputformat;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FRecordWriter extends RecordWriter<Text, NullWritable> {
	FSDataOutputStream fosatguigu;
	FSDataOutputStream fosother;

	// 初始化方法
	public FRecordWriter(TaskAttemptContext job){

		try {
			// 1、获取文件系统
			FileSystem fs = FileSystem.get(job.getConfiguration());
			fosatguigu = fs.create(new Path("e:/atguigu.log"));
			// 2、创建两个文件输出流
			fosother = fs.create(new Path("e:/other.log"));
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	// 业务逻辑类
	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {
		try {
			// 判断是否包含“atguigu”输出到不同文件
			if (key.toString().contains("atguigu")) {
				fosatguigu.write(key.toString().getBytes());
			} else {
				fosother.write(key.toString().getBytes());
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	// close方法
	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		// 关闭流
		IOUtils.closeStream(fosatguigu);
		IOUtils.closeStream(fosother);

	}

}
FilterOutputFormat

因为上面创建了FRecordWriter类,所在下面的类中直接返回,基本不用改

package com.imooc.myoutputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {
		
		return new FRecordWriter(job);
	}

}
Driver启动类

核心代码

// 设置(自定义outputFormat重点配置)(自定义outputFormat重点配置)(自定义outputFormat重点配置)
job.setOutputFormatClass(FilterOutputFormat.class);

// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(inputPath));

// 虽然我们自定义了outputformat,但是因为我们的outputformat继承fileoutputforma而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(outputPath));
package com.imooc.myoutputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {

	public static void main(String[] args) throws Exception {
		// 输入路径(处理E:\temp\input下的***文件)
		String inputPath = "E:\\temp\\input";
		// 输出路径(output文件夹不能存在,否则报错)
		String outputPath = "E:\\temp\\output";

		Configuration conf = new Configuration();
		// 设置切割符(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)
		conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

		// 1 获取Job对象
		Job job = Job.getInstance(conf);

		// 2 设置jar存储位置
		job.setJarByClass(FilterDriver.class);

		// 3 关联Map和Reduce类
		job.setMapperClass(FilterMapper.class);
		job.setReducerClass(FilterReducer.class);

		// 4 设置Mapper阶段输出数据的key和value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);

		// 5 设置最终(reducer)数据输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置(自定义outputFormat重点配置)(自定义outputFormat重点配置)(自定义outputFormat重点配置)
		job.setOutputFormatClass(FilterOutputFormat.class);

		// 6 设置输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(inputPath));

		// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
		// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		// 7 提交job
		// job.submit();
		job.waitForCompletion(true);

		System.out.println("-------OVER-----------");

	}

}

Reduce Join

注意:

一般业务逻辑都在map阶段处理,所以不推荐reduce join ,推荐map join

工作原理

  • Map端的主要工作:

为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key ,其余部分和新加的标志作为value,最后进行输出。

  • Reduce端的主要工作:

在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志分开,最后进行合并就ok了。

Reduce Join案例实操

需求、输入数据、输出数据

(select id,pname,amount form t_order,t_product where t_order.pid=t_product.pid )

订单数据表t_order + 商品信息表t_product ====== 最终数据形式

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

1004

01

4

1005

02

5

1006

03

6

pid

pname

01

小米

02

华为

03

格力

id

pname

amount

1001

小米

1

1004

小米

4

1002

华为

2

设计思路
  1. 创建一个(订单id,商品pid,数量amount,商品名称pname,标志位flag)实体类(包含两个表的所有属性)
  2. 在map阶段中根据切片的信息获取操作文件名称,并且封装对象,如果操作的名称为order.txt,flag="order”,否则flag="product"
  3. map的输出 key 为共同的属性 pid ,value为封装的实体类
  4. 同一个reduce里有两种数据,一种是pid为某某某的订单记录(多条),另一种是pid为某某某的商品记录(一条),而且有fiag标记位,很容易分开,然后就可以拼接成最终的数据形式了
编码

实体类

package com.imooc.reducejoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * @author 柴火
 *
 */
public class TableBean implements Writable {

	private String order_id; // 订单id
	private String p_id; // 产品id
	private int amount; // 产品数量
	private String pname; // 产品名称
	private String flag; // 表的标记

	public TableBean() {
		super();
	}

	@Override
	public void readFields(DataInput in) throws IOException {

		this.order_id = in.readUTF();
		this.p_id = in.readUTF();
		this.amount = in.readInt();
		this.pname = in.readUTF();
		this.flag = in.readUTF();

	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(order_id);
		out.writeUTF(p_id);
		out.writeInt(amount);
		out.writeUTF(pname);
		out.writeUTF(flag);
	}

	@Override
	public String toString() {
		return order_id + "\t" + amount + "\t" + pname;
	}

	public String getOrder_id() {
		return order_id;
	}

	public void setOrder_id(String order_id) {
		this.order_id = order_id;
	}

	public String getP_id() {
		return p_id;
	}

	public void setP_id(String p_id) {
		this.p_id = p_id;
	}

	public int getAmount() {
		return amount;
	}

	public void setAmount(int amount) {
		this.amount = amount;
	}

	public String getPname() {
		return pname;
	}

	public void setPname(String pname) {
		this.pname = pname;
	}

	public String getFlag() {
		return flag;
	}

	public void setFlag(String flag) {
		this.flag = flag;
	}

}

mapper

package com.imooc.reducejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

	String name;
	TableBean bean = new TableBean();
	Text k = new Text();

	@Override
	protected void setup(Context context){

		// 1 获取输入文件切片
		FileSplit split = (FileSplit) context.getInputSplit();

		// 2 获取输入文件名称
		name = split.getPath().getName();
	}

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		// 1 获取输入数据
		String line = value.toString();

		// 2 不同文件分别处理
		if (name.startsWith("order")) {// 订单表处理

			// 2.1 切割
			String[] fields = line.split("\t");

			// 2.2 封装bean对象
			bean.setOrder_id(fields[0]);
			bean.setP_id(fields[1]);
			bean.setAmount(Integer.parseInt(fields[2]));
			bean.setPname("");
			bean.setFlag("order");

			k.set(fields[1]);
		} else {// 产品表处理

			// 2.3 切割
			String[] fields = line.split("\t");

			// 2.4 封装bean对象
			bean.setP_id(fields[0]);
			bean.setPname(fields[1]);
			bean.setFlag("pd");
			bean.setAmount(0);
			bean.setOrder_id("");

			k.set(fields[0]);
		}

		// 3 写出
		context.write(k, bean);
	}
}

Reducer

package com.imooc.reducejoin;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<TableBean> values, Context context)
			throws IOException, InterruptedException {

		// 1准备存储订单的集合
		ArrayList<TableBean> orderBeans = new ArrayList<>();

		// 2 准备bean对象
		TableBean pdBean = new TableBean();

		for (TableBean bean : values) {

			if ("order".equals(bean.getFlag())) {// 订单表

				// 拷贝传递过来的每条订单数据到集合中
				TableBean orderBean = new TableBean();

				try {
					BeanUtils.copyProperties(orderBean, bean);
				} catch (Exception e) {
					e.printStackTrace();
				}

				orderBeans.add(orderBean);
			} else {// 产品表

				try {
					// 拷贝传递过来的产品表到内存中
					BeanUtils.copyProperties(pdBean, bean);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		// 3 表的拼接
		for (TableBean bean : orderBeans) {

			bean.setPname(pdBean.getPname());
			
			System.out.println(bean);

			// 4 数据写出去
			context.write(bean, NullWritable.get());
		}
	}
}

Driver驱动类

package com.imooc.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TableDriver {

	public static void main(String[] args) throws Exception {

		// 0 根据自己电脑路径重新配置
		args = new String[] { "e:/temp/input", "e:/temp/output" };

		// 1 获取配置信息,或者job对象实例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 指定本程序的jar包所在的本地路径
		job.setJarByClass(TableDriver.class);

		// 3 指定本业务job要使用的Mapper/Reducer业务类
		job.setMapperClass(TableMapper.class);
		job.setReducerClass(TableReducer.class);

		// 4 指定Mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TableBean.class);

		// 5 指定最终输出的数据的kv类型
		job.setOutputKeyClass(TableBean.class);
		job.setOutputValueClass(NullWritable.class);

		// 6 指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
		job.waitForCompletion(true);
		
		System.out.println("---------OVER------------");

	}
}

Map Join

使用场景

Map Join适用于一张表十分小、一张表很大的场景。

优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

具体办法:采用DistributedCache

(1)在Mapper的setup阶段,将文件读取到缓存集合中。

(2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。

job.addCacheFile(new URI("file://e:/cache/pd.txt"));

MapJoin实操

需求、输入数据、输出数据同上面的ReduceJoin
编码步骤
  • 创建Mapper类
  • 重新setup方法,读小表
@Override
	protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {

		// 1 获取缓存的文件
		URI[] cacheFiles = context.getCacheFiles();
		String path = cacheFiles[0].getPath().toString();

		BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

		String line;
		while (StringUtils.isNotEmpty(line = reader.readLine())) {

			// 2 切割
			String[] fields = line.split("\t");

			// 3 缓存数据到集合
			pdMap.put(fields[0], fields[1]);
		}

		// 4 关流
		reader.close();

	}
  • 重新map方法,进行join
  • 编写Driver驱动类
// 5、设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 6 加载缓存数据
job.addCacheFile(new URI("file:///e:/temp/product.txt"));

// 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
编码

mapper

package com.imooc.mapjoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
	Map<String, String> pdMap = new HashMap<>();

	@Override
	protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {

		// 1 获取缓存的文件
		URI[] cacheFiles = context.getCacheFiles();
		String path = cacheFiles[0].getPath().toString();

		BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

		String line;
		while (StringUtils.isNotEmpty(line = reader.readLine())) {

			// 2 切割
			String[] fields = line.split("\t");

			// 3 缓存数据到集合
			pdMap.put(fields[0], fields[1]);
		}

		// 4 关流
		reader.close();

	}

	Text k = new Text();

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 1 获取一行
		String line = value.toString();

		// 2 截取
		String[] fields = line.split("\t");

		// 3 获取产品id
		String pId = fields[1];

		// 4 获取商品名称
		String pdName = pdMap.get(pId);

		// 5 拼接
		k.set(line + "\t" + pdName);

		// 6 写出
		context.write(k, NullWritable.get());

	}
}

Driver

package com.imooc.mapjoin;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {

	public static void main(String[] args) throws Exception {

		// 0 根据自己电脑路径重新配置
		args = new String[] { "e:/temp/input", "e:/temp/output" };

		// 1 获取job信息
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 设置加载jar包路径
		job.setJarByClass(DistributedCacheDriver.class);

		// 3 关联map
		job.setMapperClass(DistributedCacheMapper.class);

		// 4、设置输入输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 5、设置最终输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 6 加载缓存数据
		job.addCacheFile(new URI("file:///e:/temp/product.txt"));

		// 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
		job.setNumReduceTasks(0);

		// 8 提交
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

倒排索引案例(多job串联)

敲黑板

多job串联不是配置在一起的,而是单独运行的,比如2个job串联就有2个驱动类,而不是一个驱动类

输入数据

期望输出数据

atguigu c.txt-->2 b.txt-->2 a.txt-->3

pingping c.txt-->1 b.txt-->3 a.txt-->1

ss c.txt-->1 b.txt-->1 a.txt-->2

操作步骤

job1

首先将a.txt

atguigu pingping
atguigu ss
atguigu ss

转换为

atguigu--a.txt	3
ss--a.txt 2
.....

切分,然后在setup中获得切片中文件的名称(a.txt),在map方法中将其拼接在atguigu后作为K,传入到reduce中,value为1

在reduce阶段进行累加

job2

job1的输出结果,即为job2的输出结果

atguigu--a.txt	3
atguigu--b.txt	2
atguigu--c.txt	2
pingping--a.txt	1
pingping--b.txt	3
pingping--c.txt	1
ss--a.txt	2
ss--b.txt	1
ss--c.txt	1

在map阶段将 (atguigu--a.txt 3)切分为 key=atguigu value=a.txt 3 ,传给reduce

在reduce阶段将key相同的values(a.txt 3 b.txt 3 c.txt 2 )进行拼接,从而得出预期结果

编码

如果上面看懂的话在这编码已经不是问题了,重要的是思路,思路,思路

学习资料

尚硅谷hadoop