Hadoop- 流量汇总程序之如何实现hadoop的序列化接口及代码实现

 

流量汇总程序需求

统计每一个用户(手机号)锁耗费的总上行流量、下行流量、总流量。

流程剖析

阶段:map

读取一行数据,切分字段,

抽取手机号,上行流量,下行流量

context.write(手机号,bean)

 

阶段:reduce

汇总遍历每个bean,将其中的上行流量,下行流量分别累加,得到一个新的bean

context.write(手机号,新bean);

1.定义一个phonebean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.Rz_Lee..mr.flowsum;
import org.apache..io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * Created by Rz_Lee on 2017/8/15.
 */
public class FlowBean implements Writable{
    private long upFlow;
    private long dFlow;
    private long sumFlow;
    //反序列化时,需要反射调用空参构造函数,所以要显示定义一个
    public FlowBean() {
    }
    public FlowBean(long upFlow, long dFlow) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = dFlow+upFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getdFlow() {
        return dFlow;
    }
    public void setdFlow(long dFlow) {
        this.dFlow = dFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    @Override
    public String toString() {
        return upFlow+"\t"+dFlow+"\t"+sumFlow;
    }
    /**
     * 序列化方法
     * @param dataOutput
     * @throws IOException
     */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(dFlow);
        dataOutput.writeLong(sumFlow);
    }
    /**
     * 反序列化方法
     * 注意:反序列化的顺序和序列化的顺序一致
     * @param dataInput
     * @throws IOException
     */
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        dFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }
}

2.实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.Rz_Lee.hadoop.mr.flowsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
 * Created by Rz_Lee on 2017/8/15.
 */
public class FlowCount {
    static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //将一行内容转为String
            String line = value.toString();
            //切分字段
            String[] fields = line.split("\t");
            //取出手机号
            String phoneNbr = fields[1];
            //取出上行和下行流量
            Long upFlow =Long.parseLong(fields[fields.length-3]);
            Long dFlow =Long.parseLong(fields[fields.length-2]);
            context.write(new Text(phoneNbr),new FlowBean(upFlow,dFlow));
        }
    }
    static class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>
    {
        //<135,bean1><135,bean2><135,bean3>
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            long sum_upFlow = 0;
            long sum_dFlow = 0;
            //遍历所有Bean,将其中的上行流量,下行流量分别累加
            for(FlowBean bean:values){
                sum_upFlow+=bean.getUpFlow();
                sum_dFlow+=bean.getdFlow();
            }
            FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
            context.write(key,resultBean);
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        /*conf.set("mapreduce.framework.name","yarn");
        conf.set("yarn.resourcemanager.hostname","srv01");*/
        /*job.setJar("/usr/hadoop/wc.jar");*/
        //指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCount.class);
        //指定本业务job使用的mapper/reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        //指定mapper输出数据的KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //指定最终输出的数据的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

3.数据来源phone.txt:

1
2
3
4
5
1363157985123   13726232222 50-FD-07-A4-72-B8:CMCC  120.196.100.82  i.cnblogs.com       24  27  2586    24681   200
1363157995456   13826547777 5C-0E-88-C7-F2-E0:CMCC  10.197.40.4         4   0   364 0   200
1363157991789   13926438888 20-10-7A-28-CC-0A:CMCC  120.197.100.99          2   4   232 2151    200
1363154400101   13926259999 CC-0E-8B-8B-B1-50:CMCC  120.196.40.4            4   0   440 0   200
1363157993121   18211575555 94-17-AC-CD-E6-18:CMCC-EASY 120.196.100.99  www.bilibili.com    视频网站    20  15  8585    2106    200

4.把Flowcount项目导成jar包,连同数据来源一起上传到HDFS,运行 hadoop jar wordcount.jar 包.类名 /源文件路径 /输出数据文件夹 

 

打开浏览器输入:yarn节点的IP:8088 ,在网页上可以看见整个Job的运行情况。

 

 

人已赞赏
博客

Hadoop- Wordcount程序原理及代码实现

2019-8-17 18:40:01

博客

Redis- 内存数据库Redis之安装部署

2019-8-17 18:45:09

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
有新消息 消息中心
搜索