博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce 经典案例手机流量排序的分析
阅读量:5267 次
发布时间:2019-06-14

本文共 4564 字,大约阅读时间需要 15 分钟。

在进行流量排序之前,先要明白排序是发生在map阶段,排序之后(排序结束后map阶段才会显示100%完成)才会到reduce阶段(事实上reduce也会排序),.此外排序之前要已经完成了手机流量的统计工作,即把第一次mr的结果作为本次排序的输入.也就是说读取的数据格式为     手机号 上行流量 下行流量 总流量

1,map阶段,读取并封装流量信息,不同的是context.write()时key必须是封装的实体类,而不再是手机号

1 /** 2  * 输入key 行号 3  * 输入value 流量信息 4  * 输出key 封装了流量信息的FlowBean 5  * 输出value 手机号 6  * @author tele 7  * 8  */ 9 public class FlowSortMapper extends Mapper
{10 FlowBean flow = new FlowBean();11 Text v = new Text();12 //读取的内容格式 手机号 上行流量 下行流量 总流量13 @Override14 protected void map(LongWritable key, Text value, Mapper
.Context context)15 throws IOException, InterruptedException {16 17 //1.读取18 String line = value.toString();19 20 //2.切割21 String[] split = line.split("\t");22 String upFlow = split[1];23 String downFlow = split[2];24 String phoneNum = split[0];25 26 //3.封装流量信息27 flow.set(Long.parseLong(upFlow),Long.parseLong(downFlow));28 29 v.set(phoneNum);30 31 //4.写出32 context.write(flow,v);33 34 }35 }

2.map之后会根据key进行排序,因此如果要实现自定义排序,必须让定义的bean实现WritableComparable接口,并重写其中的compare方法,我们只需要告诉MapReduce根据什么排序,升序还是降序就可以了

具体的排序过程由MapReduce完成

1 public class FlowBean implements WritableComparable
{ 2 private long upFlow; 3 public long getUpFlow() { 4 return upFlow; 5 } 6 public void setUpFlow(long upFlow) { 7 this.upFlow = upFlow; 8 } 9 public long getDownFlow() {10 return downFlow;11 }12 public void setDownFlow(long downFlow) {13 this.downFlow = downFlow;14 }15 public long getSumFlow() {16 return sumFlow;17 }18 public void setSumFlow(long sumFlow) {19 this.sumFlow = sumFlow;20 }21 private long downFlow;22 private long sumFlow;23 24 /**25 * 反序列化时需要通过反射调用空参构造方法.必须有空参构造26 */27 public FlowBean() {28 super();29 }30 31 public FlowBean(long upFlow, long downFlow) {32 super();33 this.upFlow = upFlow;34 this.downFlow = downFlow;35 this.sumFlow = upFlow + downFlow;36 }37 38 public void set(long upFlow, long downFlow) {39 this.upFlow = upFlow;40 this.downFlow = downFlow;41 this.sumFlow = upFlow + downFlow;42 }43 44 45 /**46 * 序列化与反序列化顺序必须一致47 */48 49 50 //序列化51 @Override52 public void write(DataOutput output) throws IOException {53 output.writeLong(upFlow);54 output.writeLong(downFlow);55 output.writeLong(sumFlow);56 57 }58 59 60 //反序列化61 @Override62 public void readFields(DataInput input) throws IOException {63 upFlow = input.readLong();64 downFlow = input.readLong();65 sumFlow = input.readLong();66 }67 68 /**69 * reduce context.write()会调用此方法70 */71 @Override72 public String toString() {73 return upFlow + "\t" + downFlow + "\t" + sumFlow;74 }75 76 77 @Override78 public int compareTo(FlowBean o) {79 // -1表示不交换位置,即降序,1表示交换位置,升序80 return this.sumFlow > o.getSumFlow() ? -1:1;81 }82 83 }

3.reduce阶段,map阶段会对输出的value根据key进行分组,具有相同key的value会被划分到一组,这样reduce阶段执行一次reduce()读取一组,由于map阶段输出的key是定义的FlowBean,因此key是唯一的,从而

每组只有一个值,即Iterable<Text> value中只有一个值,也就是只有一个手机号

1 /** 2  * 输出的格式仍然为 手机号 上行流量 下行流量 总流量 3  * @author tele 4  * 5  */ 6 public class FlowSortReducer extends Reducer
{ 7 /** 8 * reduce阶段读入的仍然是一组排好序的数据 9 * 前面map阶段输出的结果已根据key(FlowBean)进行分组,但由于此处key的唯一10 * 所以一组只有一个数据,即 Iterable
value 中只有一个值11 */12 @Override13 protected void reduce(FlowBean key, Iterable
value, Reducer
.Context context)14 throws IOException, InterruptedException {15 16 //输出17 Text phone = value.iterator().next();18 context.write(phone,key);19 20 21 }22 }

下面进行debug,在map(),reduce()方法的开始与结束均打上断点,在FlowBean的compareTo()中也打上断点

map读取的内容

写出,注意key是FlowBean对象

接下来是排序,可以看到排序时map仍然不是100%,也就是说map阶段进行了排序(reduce阶段也会进行排序)

排序之后进入reduce阶段,reduce时write会调用FlowBean的toString()把结果输出到磁盘上

reduce除了归并排序之外,在执行write时同样会进行一次排序,执行第一组的write,(会调用FlowBean的toString()).但接下来还会去执行compareTo方法,此时在磁盘上生成的是临时目录,并且生成的part000文件是0KB,在执行完第二组的write之后才会真正把第一组数据写出到磁盘上

 

 

 

 part000此时有了数据

 

这样看来我们重写的compareTo方法无论在map阶段还是reduce阶段都被调用了

 

转载于:https://www.cnblogs.com/tele-share/p/9633263.html

你可能感兴趣的文章
Oracle-05
查看>>
linux grep 搜索查找
查看>>
Not enough free disk space on disk '/boot'(转载)
查看>>
android 签名
查看>>
android:scaleType属性
查看>>
mysql-5.7 innodb 的并行任务调度详解
查看>>
shell脚本
查看>>
Upload Image to .NET Core 2.1 API
查看>>
Js时间处理
查看>>
Java项目xml相关配置
查看>>
三维变换概述
查看>>
vue route 跳转
查看>>
【雷电】源代码分析(二)-- 进入游戏攻击
查看>>
Entityframework:“System.Data.Entity.Internal.AppConfig”的类型初始值设定项引发异常。...
查看>>
Linux中防火墙centos
查看>>
mysql新建用户,用户授权,删除用户,修改密码
查看>>
JS博客
查看>>
如何设置映射网络驱动器的具体步骤和方法
查看>>
ASP.NET WebApi 基于OAuth2.0实现Token签名认证
查看>>
283. Move Zeroes把零放在最后面
查看>>