需求:读取指定目录的数据,并实现单词计数的功能
实现方案:
Spout来读取指定目录的数据,作为后续Bolt处理的input
使用一个Bolt把input 的数据,切割分开,我们按照逗号进分割
使用一个Bolt来进行最终的单词次数统计操作并输出
拓扑设计:DataSourceSpout ==>SpiltBolt ==>CountBolt
Storm编程注意,Topology,Spout,Bolt等命名不能重复,伤到集群需要注意出现重复命名,会报错的。
package com.imooc.bigdata;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.File;
import java.io.IOException;
import java.util.*;
/**
* 使用Storm完成词频统计功能
*/
public class LocalWordCountStormTopology {
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 业务逻辑
* 1) 读取指定目录文件夹下的数据:E:\iso\linux
* 2) 把每一行的数据发射出去
*/
@Override
public void nextTuple() {
// 获取所有文件
Collection<File> files = FileUtils.listFiles(new File("E:\\iso\\linux"), new String[]{"txt"}, true);
for (File file: files){
try {
// 获取文件中的所有内容
List<String> lines = FileUtils.readLines(file);
// 获取文件中的每行的内容
for (String line: lines){
// 发射出去
this.collector.emit(new Values(line));
}
// TODO... 数据处理完成之后,改名,否则一直重复执行
FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
/**
* 对数据进行分割
*/
public static class SplitBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 业务逻辑:
* line: 对line进行分割,按逗号进行分割
* @param input
*/
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split(",");
for (String word: words){
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
/**
* 词频汇总Bolt
*/
public static class WordCountBlot extends BaseRichBolt{
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
Map<String, Integer> map = new HashMap<String, Integer>();
/**
* 业务逻辑:
* 1)获取每个单词
* 2)对所有单词进行汇总
* 3)输出
* @param input
*/
@Override
public void execute(Tuple input) {
// 1)获取每个单词
String word = input.getStringByField("word");
Integer count = map.get(word);
if (count == null){
count = 0;
}
count ++;
// 2)对所有单词进行汇总
map.put(word, count);
// 3)输出
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~");
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry: entries) {
System.out.println(entry);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
// 通过TopologyBuilder根据Spout和Bilt构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("WordCountBlot", new WordCountBlot()).shuffleGrouping("SplitBolt");
// 创建本地集群
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountStormTopology", new Config(), builder.createTopology());
}
}


















