Spark streaming part 3: Real time twitter sentiment analysis using kafka

This is a followup to the previous post where we integrated spark streaming with flume to consume live tweets from flume events.
In this post we will create a kafka producer that will ingest data from Twitter Streaming API and then transform the data using spark streaming.
















For this we make use of the twitter4j java library which provides access to the Twitter Streaming API by using the oAuth tokens. To get started, create your oAuth tokens link here and scala project by following this post.

Kafka is a publish-subscribe based messaging system that is distributed, partitioned, replicated and fault tolerant.
  • It maintains feeds of messages in categories called topics
  • Kafka producers publish messages to topics
  • Kafka consumers subscribe to topics and process the published feed
  • kafka broker is a cluster of one or more servers
Kafka provides a java API for producer and consumer. We will only look into creating a kafka producer for ingesting data into the cluster and a spark streaming app to transform the kafka feed.

Create a java maven project and update the pom.xml with the dependencies given below:
Create a KafkaTwitterProducer class that uses the KafkaProducer class provided by the KafkaProducer API to send messages to a topic. 
We use the ConfigurationBuilder from twitter4j library to set the oAuth tokens. LinkedBlockingQueue is used to store messages in a queue in First in First Out (FIFO) order. Producer continuously polls this queue to retrieve any new messages if available and sends them to kafka broker.

What does each of the producer configuration setting mean?
We now move to the spark streaming code that creates DStream from kafka messages every 2 seconds and determines the popular hashtags over a 60 sec and 10 sec window interval. 
Create a KafkaSparkPopularHashTags.scala object in the scala project that was created initially.
Kafka messages arrive in the form of (key, value) pair from which value forms the lines DStream.
The rest of the code is similar to the spark streaming integration with flume we did in our previous post.
setMaster("local[6]") - Make sure to set master to local mode with at least 2 threads as 1 thread is used for collecting the incoming streams and another thread for processing it.

We count the popular hashtags with the below code:
   val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))                
            .map{case (topic, count) => (count, topic)}  
            .transform(_.sortByKey(false))  

The code basically does a word count of the hashtags over the previous 60/10 secs as specified in the reduceByKeyAndWindow and sorts them in descending order. 

reduceByKeyAndWindow is used in case we have to apply transformations on data that is accumulated in the previous stream intervals.

Run:
Start the zookeeper server:
/usr/local/zookeeper/bin/zkServer.sh start

Start kafka server:
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

Create a topic "tweets" in kafka:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tweets

Check if topic has been created:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

Run the KafkaTwitterProducer:
In the kafka producer java project, right click on KafkaTwitterProducer.java > Run configurations > Arguements
Add the 4 twitter oAuth tokens followed by the kafka topic to consume and keywords seperated by space.


click Run.
You should now see your kafka console populated with the tweets and hashtags.












Open "New Console View" in eclipse:

Now Run the Spark Streaming code:
In the scala project, right click on KafkaSparkPopularHashTags.scala > Run configurations > Arguements
Add the zookeeper hostname, consumer group name, kafka topic to consume and number of threads:
localhost:2181 spark-streaming-consumer-group tweets 4

Now click Run.
You should now find the top hashtags in the previous 60/10 secs:



Note: if you do not see the scala console, keep pressing the "Display selected console" button until you see the scala console.

Checkout the project in my github repo.


You may also like:
Spark Streaming part 1: Real time twitter sentiment analysis

Spark streaming part 2: Real time twitter sentiment analysis using Flume

Data guarantees in Spark Streaming with kafka integration

Realtime stream processing using Apache Storm - Part 1

Realtime stream processing using Apache Storm and Kafka - Part 2

Labels: , , , , ,