Too young too simple and too naive



15 Aug 2013


Recently, our team is considering how to process our log data centralized. We want to process our data real-time and our log will be big in the near future. So storm may be the best technology solution.

So, I will begin to do a research on storm to meet our requirements and solve some problems may appear in the future.And I will record my learning process in the blog and also provide some codes. Hope it will be helpful for you.

What is storm

OK, the first question of every thing and I think is the hardest question to answer.

nathanmarz says: Storm is a distributed realtime computation system. Similar to how
hadoop provides a set of general primitives for doing batch processing. Storm provides 
a set of general primitives for doing realtime computation. Storm is simple, can be used
with any programming language, and is a lot of fun to use!

Recommend some useful resources for you first:

Storm is write in Clojure mostly, and provide entire Java API, and you can write applications use almost any languages, ofcourse you need one adapter to connect to it.If you want to read source code of Storm you must know Clojure and Java, ofcourse I think it is necessary.

Some Concepts You Should Know

In this blog I will just show you how to install a storm cluster, how to develop a topology and how to submit a topology to the cluster step by step.

Install a storm cluster

Before we install storm cluster, let us see how storm cluster running.

                      -----------             |supervisor|
   ----------         |zookeeper|             ------------
   |        |         -----------             ------------
   | Nimbus |  <----> -----------  <------->  |supervisor|
   |        |         |zookeeper|             ------------
   ----------         -----------             ------------
                      -----------             |supervisor|
                      |zookeeper|             ------------
                      -----------             ------------

Numbus is the master node and supervisor is the worker node, and zookeeper is the import thing to record the states of master and worker node.

OK, begin to install, recommend ZeroMQ-2.1.7 This JZMQ Java6 Python-2.6.6

$ wget

$ wget

$ wget

Config and startup Zookeeper

$ vim conf/zoo.cfg


$ echo '1' > xx.xx.xx.xx:/storm/zookeeper-data/myid

$ echo '2' > oo.oo.oo.oo:/storm/zookeeper-data/myid

$ bin/ start

Write storm.yaml config file

    - "xx.xx.xx.xx"
    _ "oo.oo.oo.oo"

storm.local.dir: "/storm/storm-state" "xx.xx.xx.xx"

    - 6701
    - 6702
    - 6703
    - 6704

Run Some Daemons

$ nohup bin/storm numbus &

$ nohup bin/storm supervisor &

$ nohup bin/storm ui &

At last, open your browser to view xx.xx.xx.xx:8080.

Develop a HelloWorld topology in local

Before we write topology,at lease we should know three concepts, Topology , Bolts, Spouts.

OK, then I will write a simple topology just append World to the end, and also write contents to file to see the result.


public class HelloWorldSpout extends BaseRichSpout {

    SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("hello"));

    public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;

    public void nextTuple() {
        collector.emit(new Values("Hello"));


public class HelloWorldBolt extends BaseBasicBolt {

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String msg = tuple.getString(0);
        System.out.println("=====before write file=====");
        try {
            File file = new File("/tmp/storm.txt");
            if (!file.exists()) {
            FileWriter fw = new FileWriter(file.getAbsoluteFile(), true);
            BufferedWriter bw = new BufferedWriter(fw);
            bw.write(msg + "\n");
        } catch (IOException e) {
        System.out.println("=====after write file=====");
        collector.emit(new Values(msg + " World"));

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("world"));


public class HelloWorldTopology {

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Hello", new HelloWorldSpout(), 12);
        builder.setBolt("World", new HelloWorldBolt(), 12).shuffleGrouping("Hello");
        builder.setBolt("WorldTwo", new HelloWorldBolt(), 12).shuffleGrouping("World");

        Config config = new Config();

        if (args != null && args.length > 0) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Hello-World-BaiJian", config, builder.createTopology());

Test it in your local computer and then submit to cluster

$ mvn package

$ mvn exec:java -Dexec.classpathScope=compile \

$ storm jar /path/to/jar TopologyName

$ storm kill TopologyName

My Source Code

comments powered by Disqus