流量汇总程序需求
统计每一个用户(手机号)锁耗费的总上行流量、下行流量、总流量。
流程剖析
阶段:map
读取一行数据,切分字段,
抽取手机号,上行流量,下行流量
context.write(手机号,bean)
阶段:reduce
汇总遍历每个bean,将其中的上行流量,下行流量分别累加,得到一个新的bean
context.write(手机号,新bean);
代码实现
1.定义一个phonebean:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
package com.Rz_Lee.hadoop.mr.flowsum; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Rz_Lee on 2017/8/15. */ public class FlowBean implements Writable{ private long upFlow; private long dFlow; private long sumFlow; //反序列化时,需要反射调用空参构造函数,所以要显示定义一个 public FlowBean() { } public FlowBean( long upFlow, long dFlow) { this .upFlow = upFlow; this .dFlow = dFlow; this .sumFlow = dFlow+upFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow( long upFlow) { this .upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow( long dFlow) { this .dFlow = dFlow; } public long getSumFlow() { return sumFlow; } @Override public String toString() { return upFlow+ "\t" +dFlow+ "\t" +sumFlow; } /** * 序列化方法 * @param dataOutput * @throws IOException */ public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(dFlow); dataOutput.writeLong(sumFlow); } /** * 反序列化方法 * 注意:反序列化的顺序和序列化的顺序一致 * @param dataInput * @throws IOException */ public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); dFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); } } |
2.实现类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
package com.Rz_Lee.hadoop.mr.flowsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by Rz_Lee on 2017/8/15. */ public class FlowCount { static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将一行内容转为String String line = value.toString(); //切分字段 String[] fields = line.split( "\t" ); //取出手机号 String phoneNbr = fields[ 1 ]; //取出上行和下行流量 Long upFlow =Long.parseLong(fields[fields.length- 3 ]); Long dFlow =Long.parseLong(fields[fields.length- 2 ]); context.write( new Text(phoneNbr), new FlowBean(upFlow,dFlow)); } } static class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> { //<135,bean1><135,bean2><135,bean3> @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0 ; long sum_dFlow = 0 ; //遍历所有Bean,将其中的上行流量,下行流量分别累加 for (FlowBean bean:values){ sum_upFlow+=bean.getUpFlow(); sum_dFlow+=bean.getdFlow(); } FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow); context.write(key,resultBean); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); /*conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","srv01");*/ /*job.setJar("/usr/hadoop/wc.jar");*/ //指定本程序的jar包所在的本地路径 job.setJarByClass(FlowCount. class ); //指定本业务job使用的mapper/reducer业务类 job.setMapperClass(FlowCountMapper. class ); job.setReducerClass(FlowCountReducer. class ); //指定mapper输出数据的KV类型 job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(FlowBean. class ); //指定最终输出的数据的KV类型 job.setOutputKeyClass(Text. class ); job.setOutputValueClass(FlowBean. class ); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[ 0 ])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[ 1 ])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 /*job.submit();*/ boolean res = job.waitForCompletion( true ); System.exit(res? 0 : 1 ); } } |
3.数据来源phone.txt:
1
2
3
4
5
|
1363157985123 13726232222 50 -FD- 07 -A4- 72 -B8:CMCC 120.196 . 100.82 i.cnblogs.com 24 27 2586 24681 200 1363157995456 13826547777 5C-0E- 88 -C7-F2-E0:CMCC 10.197 . 40.4 4 0 364 0 200 1363157991789 13926438888 20 - 10 -7A- 28 -CC-0A:CMCC 120.197 . 100.99 2 4 232 2151 200 1363154400101 13926259999 CC-0E-8B-8B-B1- 50 :CMCC 120.196 . 40.4 4 0 440 0 200 1363157993121 18211575555 94 - 17 -AC-CD-E6- 18 :CMCC-EASY 120.196 . 100.99 www.bilibili.com 视频网站 20 15 8585 2106 200 |
4.把Flowcount项目导成jar包,连同数据来源一起上传到HDFS,运行 hadoop jar wordcount.jar 包.类名 /源文件路径 /输出数据文件夹
打开浏览器输入:yarn节点的IP:8088 ,在网页上可以看见整个Job的运行情况。