求和案例

完整源码

需求:

1+2+3+4+5+6+....=???

实现方案:

Spout发送数据作为input
使用Bolt来处理业务:求和
将结果输出到控制台

拓扑设计:

DataSourceSpout -->SumBolt

1. 定义数据源代码——Spout

  1. 建立一个类继承BaseRichSpout,并重写opennextTupledeclareOutputFields方法
  2. 定义SpoutOutputCollector,因为nextTuple方法会用到。

     private  SpoutOutputCollector spoutOutputCollector;
  3. 编写open方法

    this.spoutOutputCollector = spoutOutputCollector;
  4. 编写nextTuple方法,并创建一个整型number 模拟获取数据。

    this.spoutOutputCollector.emit(new Values(++number));
  5. 编写declareOutputFields方法声明输出字段

    outputFieldsDeclarer.declare(new Fields("num"));

#### 完整代码:

/**
     * Spout需要继承BaseRiceSpout
     * 数据源需要产生数据并发射
     */
    public static class DataSourceSpout extends BaseRichSpout{

        private  SpoutOutputCollector spoutOutputCollector;

        /**
         * 初始化方法 ,是会被调用一次
         * @param map 配置参数
         * @param topologyContext 上下文
         * @param spoutOutputCollector
         */

        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }

        int number = 0 ;

        /**
         * 会产生数据,在生产上肯定是从消息队列中获取数据
         * 死循环,一直不停的执行
         */
        @Override
        public void nextTuple() {
            this.spoutOutputCollector.emit(new Values(++number));

            System.out.println("Spout:"+number);
            //防止数据产生太快
            Utils.sleep(1000);
        }

        /**
         * 声明输出字段
         * @param outputFieldsDeclarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            /**
             * 因为上面的this.spoutOutputCollector.emit(new Values(++number));  number是一个
             * 所以num也是一个
             * 如果是:
             * this.spoutOutputCollector.emit(new Values(++number,number2));
             * outputFieldsDeclarer.declare(new Fields("num","num2"));
             */
            outputFieldsDeclarer.declare(new Fields("num"));
        }
    }

#### 注意:

因为上面的this.spoutOutputCollector.emit(new Values(number++));  number是一个,所以num也是一个,
如果是:
this.spoutOutputCollector.emit(new Values(number++,number2));
那么:
outputFieldsDeclarer.declare(new Fields("num","num2"));

2. 定义Bolt

  1. 建立一个类继承BaseRichBolt,并重写prepareexecutedeclareOutputFields方法(和Spout类似
  2. 不需要像Spout一样声明OutputCollector,因为本案例中不需要往下执行了。在一个Bolt中就可以操作了。
  3. 编辑execute方法,通过tuple.getIntegerByField("num")获取上一步传入的值。num与上一步定义的一样
  4. 不需要往下一个Bolt走,无需操作declareOutputFields方法

完整代码:

    /**
     * 定义Bolt
     */
    public static class SumBolt extends BaseRichBolt {
        /**
         * 初始化方法
         * @param map 配置
         * @param topologyContext 上下文
         * @param outputCollector 往下传需要用到
         */
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        }

        int sum = 0 ;
        /**
         * 也是死循环 ,职责:获取spout发送过来的数据,
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple) {
            //Bolt中获取值,可以根据index获取,也可以根据上一个环节中定义的Field获取(建议使用Field)
            Integer value = tuple.getIntegerByField("num");
            sum +=value;
            System.out.println("[Bolt]Sum:"+sum);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }
    }

3.Topology提交功能

使用本地模式:

本地模式可模拟过程中的Storm集群,对于开发和测试topologies非常有用。在本地模式下运行topologies 类似于运行topologies 。(在本地运行时会发现日志打印出了关于ZooKeeper的信息,说明它在本地通过ZooKeeper模拟了Storm集群)

相关文档

http://storm.apache.org/releases/1.2.3/Local-mode.html (1.1.1文档没了,1.2.3差不多)

http://storm.apache.org/releases/1.2.3/Running-topologies-on-a-production-cluster.html

步骤:

  1. 创建一个本地Storm集群

    LocalCluster cluster = new LocalCluster();
  2. 关联Spout和Bolt

    • TopologyBuilder根据SpoutBolt来构建出Topology
    • Strom中任何一个作业都是通过Topology的方式提交的
    • Topology中需要指定SpoutBolt执行顺序
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("DataSourceSpout",new DataSourceSpout());
    builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");

SumBolt会去DataSourceSpout中拿数据,所以使用.shuffleGrouping("DataSourceSpout");

  1. 使用cluster.submitTopology()方法将拓扑提交到集群

    cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());

完整代码:

/**
     *  Topology提交功能
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");

        /**
         * 创建一个本地的storm集群,本地模式不需要搭建Storm集群
         */
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
    }

运行:

E:\DEVELOP\JDK8\bin\java.exe "-javaagent:E:\DEVELOP\JetBrains\IntelliJ IDEA 
......
......
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=C:\Users\love3\AppData\Local\Temp\
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA>
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Windows 10
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=amd64
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=10.0
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=love3
12953 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server 
......
......
......
70856 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Preparing bolt __system:(-1)
[Bolt]Sum:1
70859 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Prepared bolt __system:(-1)
Spout:2
[Bolt]Sum:3
Spout:3
[Bolt]Sum:6
Spout:4
[Bolt]Sum:10
Spout:5
[Bolt]Sum:15
Spout:6
[Bolt]Sum:21
Spout:7
[Bolt]Sum:28
......

标签: Storm

添加新评论