词频统计

完整源码

需求:

读取指定目录的数据,并实现单词计数功能

实现方案:

  1. Spout来读取指定目录的数据,作为后续Bolt处理的input
  2. 使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
  3. 使用一个Bolt来进行最终的单词次数统计操作
  4. 输出

涉及到文件操作,为了简单使用commons-io

<dependency>
  <groupId>commons-io</groupId>
  <artifactId>commons-io</artifactId>
  <version>2.4</version>
</dependency>

在E:stormtest 创建文件test.txt,作为我们的测试数据

内容为:

a,b,c,d
a,b,c
a,c
a,d
f,b

拓扑设计:DataSourceSpout ==》SplitBolt==》CountBolt

1.定义数据源DataSourceSpout

创建DataSourceSpout继承BaseRichSpout重写opennextTupledeclareOutputFields

    public static class DataSourceSpout extends BaseRichSpout{
        private SpoutOutputCollector spoutOutputCollector;

        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }

        @Override
        public void nextTuple() {
            Collection<File>  files =  FileUtils.listFiles(new File("E:\\stormtest"),new String[]{"txt"},true);
            for (File file:files){
                try {
                    List<String> lines  =   FileUtils.readLines(file);
                    for (String words:lines){
                        spoutOutputCollector.emit(new Values(words));
                    }
                    //此方法一直循环(nextTuple),所以要把处理完的文件改名
                    FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("lines"));
        }
    }

2.定义SplitBolt

创建SplitBolt继承BaseRichBolt,重写prepareexecutedeclareOutputFields

    public static class SplitBolt extends BaseRichBolt{

        private OutputCollector outputCollector;

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector = outputCollector;
        }

        @Override
        public void execute(Tuple tuple) {
            String line = tuple.getStringByField("lines");
            String[] words = line.split(",");
            for (String word : words){
                outputCollector.emit(new Values(word));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }
    }

3.定义CountBolt

创建CountBolt继承BaseRichBolt,重写prepareexecutedeclareOutputFields

public static class CountBolt extends BaseRichBolt{

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

        private  Map<String,Integer> map = new HashMap<>();


        @Override
        public void execute(Tuple tuple) {
           String word =  tuple.getStringByField("word");
           Integer count = map.get(word);
           if (null==count){
               count=0;
           }
           count++;
           map.put(word,count);
            System.out.println("=====");
           Set<Map.Entry<String,Integer>>  entrySet = map.entrySet();
           for (Map.Entry<String,Integer> entry:entrySet){
               System.out.println(entry);
           }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }

4.Topology提交功能

TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("DataSourceSpout",new DataSourceSpout());
        topologyBuilder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
        topologyBuilder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("LocalWordCountStormTopology",new Config(),topologyBuilder.createTopology());
5.注意

在本案例中,运行完一次后要把文件改名,因为nextTuple()方法一直执行。所以要每发送一个文件后都要给这个文件改名,避免再次发送造成统计错误。

标签: Storm

添加新评论