Realtime stream processing using Apache Storm - Part 1

Choosing a tool for stream processing depends on whether we want the data processed in "realtime" or "near realtime", deciding on fault tolerance between "atleast once semantics" and "exactly once semantics" and between "sub-second" latency and "few seconds" latency should help us decide between Storm and Spark

In this post we will look at the innards of storm as applicable to stream processing in continuation to the streaming analysis using Spark that we discussed previously herehere and here.

Here's Storm and Spark in conversation about what they are capable of:
Storm: I process one incoming event at a time
Spark:  I collect data over small time window as a micro batch and process them
Storm: I can provide sub-second latency
Spark: I only provide few seconds latency. But hey, I ensure that data is read exactly once and guarantee better fault tolerance.
Storm:  I guarantee that I read data at least once. 
Spark: But that may allow duplicate data once you recover from faults.
Storm: Ya. But that's a compromise you have to make if you need faster processing. Never mind. Let me show you how I do my stuff.....
Core concepts:

Topology: Its the container of the business logic in the real time application. 

Streams: Core abstraction in Storm.Unbounded sequence of tuples that is processed and created in parallel.

Spouts: Source of streams in a topology. Reads tuples from external source and emits them into topology.

Bolts: Consumes any number of input streams, applies business logic and may emit new streams. It is where actual processing takes place.



Lets create a Twitter wordcount topology that counts the total occurrences of words and prints the top words every few seconds. 
You can follow the steps in this link to create your twitter oAuth tokens.

Create a Java maven project and update the pom.xml as given below:
Topology:
Now create a TwitterWordCountTopology.java class to submit the "TwitterWordCountStorm" topology.

A look at how the topology is configured:
    TopologyBuilder builder = new TopologyBuilder();  
    builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,  
      consumerSecret, accessToken, accessTokenSecret, keyWords));  
    builder.setBolt("WordSplitterBolt", new WordSplitterBolt(5)).shuffleGrouping("twitter-spout");  
    builder.setBolt("IgnoreWordsBolt", new IgnoreWordsBolt()).shuffleGrouping("WordSplitterBolt");  
    builder.setBolt("WordCounterBolt", new WordCounterBolt(5, 5 * 60, 50)).shuffleGrouping("IgnoreWordsBolt");  
The topology contains a spout and 3 bolts.

Grouping:

How the streams are distributed to the bolts is decided by 4 types of grouping that can be applied to the setBolt method:
  • shuffleGrouping: Randomly distributes tuples to next stage bolt instances. Allows automatic re balance of tuples.
  • fieldsGrouping: Groups tuples by single column values or multi column values based on hash.
  • allGrouping: Replicates tuples to next stage bolt instances.
  • globalGrouping: Sends all the tuples to a single next stage bolt instance. 

Spout:
Now lets look at the implementation of TwitterSampleSpout class.

This spout connects to the Twitter sample stream using the twitter4j library.

Lifecycle of a tuple:
  • Storm requests a tuple from the TwitterSampleSpout spout by calling the nextTuple method on the spout
  • The spout uses the SpoutOutputCollector from the open method to emit a tuple to the output stream
Implementation of nextTuple method:
   public void nextTuple() {  
    Status ret = queue.poll();  
    if (ret == null) {  
      Utils.sleep(50);  
    } else {  
      _collector.emit(new Values(ret));  
    }  
   }  

  • The incoming tweets are stored in a queue from where it is continuously polled to check for new tweets.
  • If new tweets are not found, it sleeps for 50 milliseconds before polling the queue again.
  • If found, the new tweets are emitted into the stream where a bolt picks it up to process further.
The output schema are declared in the declareOutputFields method using which the fields are identified by the bolts downstream.

Bolts:
Lets look at the implementation of StringWordSplitterBolt.java class

Bolt Structure:
prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
Called just before processing tuples. It provides OutputCollector which is used for emitting tuples from bolt.
execute(Tuple input)
Receives tuple from one of bolts inputs using getValueByField and emits the tuple using emit(input, new Values(lang, word));. The input tuple is passed as first argument to emit so as to Anchor the emitted tuple to the origin tuple.
collector.ack(input)
Once the ack method is called on the anchor, storm tells spout to ack the tuple and drop the anchor. This ensures that each message sent by a spout is fully processed by all bolts.

We then ignore few words that we dont want to count using the IgnoreWordsBolt.java class

Finally the tuples are sent to WordCounterBolt.java bolt where the wordcount logic is applied.
The WordCounterBolt takes 3 arguments:
logIntervalSec - The time interval over which to calculate and display the wordcount and top list
clearIntervalSec - The time interval after which the top list should be cleared
topListSize - The maximum size of top list of words
  • As in every bolt, business logic is applied in the execute method
  • Each word emitted by IgnoreWordsBolt bolt is added to a map with the count for logIntervalSec seconds
  • After every logIntervalSec seconds we calculate the wordcount and top list of words and their occurrences sorted in ascending order. This is done in the publishTopList() method

Launch:
We can now run the storm topology in eclipse in Local mode.
The twitter oAuth tokens and the keywords have to be passed as arguments. 
Run the TwitterWordCountStorm.java class. You should now see the rolling wordcounts.


REFERENCES:
http://storm.apache.org/releases/current/Tutorial.html
https://github.com/xetorthio/getting-started-with-storm/blob/master/ch05Bolts.asc
https://github.com/davidkiss/storm-twitter-word-count


Checkout the project in my github repo.



You may also like:
Realtime stream processing using Apache Storm and Kafka - Part 2

Spark Streaming part 1: Real time twitter sentiment analysis

Spark streaming part 2: Real time twitter sentiment analysis using Flume

Spark streaming part 3: Real time twitter sentiment analysis using kafka

Data guarantees in Spark Streaming with kafka integration

Labels: , ,