Hadoop学习:深入解析MapReduce的大数据魔力(二)
3.3 Shuffle 机制
3.3.1 Shuffle 机制
Map 方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
3.3.2 Partition 分区
1、问题引出 要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机 归属地不同省份输出到不同文件中(分区) 2、默认Partitioner分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。 3、自定义Partitioner步骤 (1)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 控制分区代码逻辑
… …
}
}
return partition;
(2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
4、分区总结 (1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx; (2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception; (3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个 ReduceTask,最终也就只会产生一个结果文件part-r-00000; (4)分区号必须从零开始,逐一累加。 5、案例分析 例如:假设自定义分区数为5,则 (1)job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件 (2)job.setNumReduceTasks(2);会报错 (3)job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件
3.3.3 Partition 分区案例实操
1)需求 将统计结果按照手机归属地不同省份输出到不同文件中(分区) (1)输入数据
2)期望输出数据 手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到 一个文件中。 2)需求分析 1、需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
3)在案例2.3的基础上,增加一个分区类
package com.atguigu.mapreduce.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions)
{
//获取手机号前三位prePhone
String phone = text.toString();
String prePhone = phone.substring(0, 3);
//定义一个分区号变量partition,根据prePhone设置分区号
int partition;
if("136".equals(prePhone)){
partition = 0;
}else if("137".equals(prePhone)){
partition = 1;
}else if("138".equals(prePhone)){
partition = 2;
}else if("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
//最后返回分区号partition
return partition;
}
}
4)在驱动函数中增加自定义数据分区设置和ReduceTask设置
package com.atguigu.mapreduce.partitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出数据的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//8 指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
//9 同时指定相应数量的ReduceTask
job.setNumReduceTasks(5);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("D\\partitionout"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
3.3.4 WritableComparable 排序
排序概述 排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是 否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数 据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
排序分类
(1)部分排序 MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。 (2)全排序 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在 处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。 (3)辅助排序:(GroupingComparator分组) (4)二次排序 在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
自定义排序WritableComparable原理分析 bean 对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
@Override
public int compareTo(FlowBean bean) {
int result;
// 按照总流量大小,倒序排列
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
3.3.5 Combiner 合并
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。 (2)Combiner组件的父类就是Reducer。 (3)Combiner和Reducer的区别在于运行的位置 Combiner是在每一个MapTask所在的节点运行; Reducer是接收全局所有Mapper的输出结果; 应该跟Reducer的输入kv类型要对应起来。 Mapper (4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。 (5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
(6)自定义Combiner实现步骤 (a)自定义一个Combiner继承Reducer,重写Reduce方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
(b)在Job驱动类中设置:
job.setCombinerClass(WordCountCombiner.class);
3.4 OutputFormat 数据输出
3.4.1 OutputFormat 接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat 接口。下面我们介绍几种常见的OutputFormat实现类。 1.OutputFormat实现类 2.默认输出格式TextOutputFormat 3.自定义OutputFormat 3.1 应用场景: 例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中。 3.2 自定义OutputFormat步骤 ➢ 自定义一个类继承FileOutputFormat。 ➢ 改写RecordWriter,具体改写输出数据的方法write()。
3.4.2 自定义OutputFormat案例实操
1)需求 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。 (1)输入数据
(2)期望输出数据
2)需求分析
3)案例实操 (1)编写LogMapper类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public
class
LogMapper
extends
Mapper<LongWritable,
Text,Text,
NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//不做任何处理,直接写出一行log数据
context.write(value,NullWritable.get());
}
}
(2)编写LogReducer类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable,Text,
NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context
context) throws IOException, InterruptedException {
// 防止有相同的数据,迭代写出
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
(3)自定义一个LogOutputFormat类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable>
{
@Override
public RecordWriter<Text, NullWritable>
getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
//创建一个自定义的RecordWriter返回
LogRecordWriter logRecordWriter = new LogRecordWriter(job);
return logRecordWriter;
}
}
(4)编写LogRecordWriter类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
try {
//获取文件系统对象
FileSystem fs = FileSystem.get(job.getConfiguration());
//用文件系统对象创建两个输出流对应不同的目录
atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));
otherOut = fs.create(new Path("d:/hadoop/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException,
InterruptedException {
String log = key.toString();
//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
if (log.contains("atguigu")) {
atguiguOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
//关流
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
(5)编写LogDriver类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
———————————————————————————
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
//虽然我们自定义了 outputformat,但是因为我们的 outputformat 继承自
fileoutputformat
//而fileoutputformat 要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}