Realtime stream processing using Apache Storm and Kafka - Part 2

Apache Storm can also be integrated with Kafka using a spout implementation that consumes messages from a kafka producer. In the previous post we ran across some of the core concepts of storm and how different it is from spark and also hooked it up with twitter. Here we will see how a Storm kafkaSpout can be created to consume messages from a KafkaProducer and further process them in successive Bolts to get the top words and their counts from Kafka messages(tweets).

Create a Java maven project and update the following dependencies in the pom.xml:


KAFKA PRODUCER:
Create a KafkaTwitterProducer class that creates a messaging queue from twitter using the twitter4j library:
We use the ConfigurationBuilder from twitter4j library to set the oAuth tokens. LinkedBlockingQueue is used to store messages in a queue in First in First Out (FIFO) order. Producer continuously polls this queue to retrieve any new messages if available and sends them to kafka broker.

What does each of the producer configuration setting mean?

  • metadata.broker.list - Where the producer can find one or more brokers to determine the leader of a topic
  • bootstrap.servers - List of host:port to establish initial connection to kafka cluster
  • acks - Number of acknowledgements the leader should have received for the producer to consider the request as complete
  • retries - Value greater than zero will cause the client to resend the record on failure
  • batch.size - Total size of records to be batched together
  • linger.ms - Small delay to be induced to allow the transmission of the sent records to be completed
  • buffer.memory - Amount of memory available to the producer for buffering
  • key.serializer - Serializer class for key
  • value.serializer - Serializer call for value

STORM TOPOLOGY:
Create a class named KafkaTwitterTopology:

  • ZkHosts:- Interface used to dynamically track Kafka broker for partition mapping
  • SpoutConfig:- Instance of KafkaConfig which is used to control the KafkaSpout's behaviour
  • SchemeAsMultiScheme:- Interface that defines how bytes[] consumed from Kafka is transformed into a Storm tuple
  • KafkaSpout:- The integration with Kafka configured using SpoutConfig


BOLT:
A look at the JsonWordSplitterBolt class:
prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
Called just before processing tuples. It provides OutputCollector which is used for emitting tuples from bolt.
execute(Tuple input)
Receives tuple from one of bolts inputs using getValueByField and emits the tuple using emit(input, new Values(lang, word));. The input tuple is passed as first argument to emit so as to Anchor the emitted tuple to the origin tuple.
collector.ack(input)
Once the ack method is called on the anchor, storm tells spout to ack the tuple and drop the anchor. This ensures that each message sent by a spout is fully processed by all bolts.

We then ignore few words that we dont want to count using the IgnoreWordsBolt.java class


Finally the tuples get sent to WordCounterBolt.java bolt where the wordcount logic is applied.


The WordCounterBolt takes 3 arguments:
logIntervalSec - The time interval over which to calculate and display the wordcount and top list
clearIntervalSec - The time interval after which the top list should be cleared
topListSize - The maximum size of top list of words


Launch:
Start the zookeeper server:
/usr/local/zookeeper/bin/zkServer.sh start


Start kafka server:
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties


Create a topic "tweets" in kafka:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tweets


Check if topic has been created:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181


We can now run the KafkaTwitterProducer and the storm topology KafkaTwitterTopology in eclipse in Local mode.
The twitter oAuth tokens, the Kafka topic name and the keywords have to be passed as arguments to the Kafka producer:
Now run the Storm KafkaTwitterTopology to see the list of top words:
Open a seperate console to view logs seperately from the Kafka producer and the Storm topology

checkout the project in my github repo.


You may also like:
Realtime stream processing using Apache Storm - Part 1

Spark Streaming part 1: Real time twitter sentiment analysis

Spark streaming part 2: Real time twitter sentiment analysis using Flume

Spark streaming part 3: Real time twitter sentiment analysis using kafka

Data guarantees in Spark Streaming with kafka integration

Labels: , , ,