This is a followup to the previous post where we integrated spark streaming with flume to consume live tweets from flume events.
In this post we will create a kafka producer that will ingest data from Twitter Streaming API and then transform the data using spark streaming.
For this we make use of the twitter4j java library which provides access to the Twitter Streaming API by using the oAuth tokens. To get started, create your oAuth tokens link here and scala project by following this post.
Kafka is a publish-subscribe based messaging system that is distributed, partitioned, replicated and fault tolerant.
- It maintains feeds of messages in categories called topics
- Kafka producers publish messages to topics
- Kafka consumers subscribe to topics and process the published feed
- kafka broker is a cluster of one or more servers
Kafka provides a java API for producer and consumer. We will only look into creating a kafka producer for ingesting data into the cluster and a spark streaming app to transform the kafka feed.
Create a java maven project and update the pom.xml with the dependencies given below:
Create a KafkaTwitterProducer class that uses the KafkaProducer class provided by the KafkaProducer API to send messages to a topic.
We use the ConfigurationBuilder from twitter4j library to set the oAuth tokens. LinkedBlockingQueue is used to store messages in a queue in First in First Out (FIFO) order. Producer continuously polls this queue to retrieve any new messages if available and sends them to kafka broker.
What does each of the producer configuration setting mean?
- metadata.broker.list - Where the producer can find one or more brokers to determine the leader of a topic
- bootstrap.servers - List of host:port to establish initial connection to kafka cluster
- acks - Number of acknowledgements the leader should have received for the producer to consider the request as complete
- retries - Value greater than zero will cause the client to resend the record on failure
- batch.size - Total size of records to be batched together
- linger.ms - Small delay to be induced to allow the transmission of the sent records to be completed
- buffer.memory - Amount of memory available to the producer for buffering
- key.serializer - Serializer class for key
- value.serializer - Serializer call for value
We now move to the spark streaming code that creates DStream from kafka messages every 2 seconds and determines the popular hashtags over a 60 sec and 10 sec window interval.
Create a KafkaSparkPopularHashTags.scala object in the scala project that was created initially.
Kafka messages arrive in the form of (key, value) pair from which value forms the lines DStream.
The rest of the code is similar to the spark streaming integration with flume we did in our previous post.
setMaster("local[6]") - Make sure to set master to local mode with at least 2 threads as 1 thread is used for collecting the incoming streams and another thread for processing it.
We count the popular hashtags with the below code:
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
The code basically does a word count of the hashtags over the previous 60/10 secs as specified in the reduceByKeyAndWindow and sorts them in descending order.
reduceByKeyAndWindow is used in case we have to apply transformations on data that is accumulated in the previous stream intervals.
Run:
Start the zookeeper server:
/usr/local/zookeeper/bin/zkServer.sh start
Start kafka server:
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
Create a topic "tweets" in kafka:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tweets
Check if topic has been created:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
Run the KafkaTwitterProducer:
In the kafka producer java project, right click on KafkaTwitterProducer.java > Run configurations > Arguements
Add the 4 twitter oAuth tokens followed by the kafka topic to consume and keywords seperated by space.
click Run.
You should now see your kafka console populated with the tweets and hashtags.
Open "New Console View" in eclipse:
Now Run the Spark Streaming code:
In the scala project, right click on KafkaSparkPopularHashTags.scala > Run configurations > Arguements
Add the zookeeper hostname, consumer group name, kafka topic to consume and number of threads:
localhost:2181 spark-streaming-consumer-group tweets 4
Now click Run.
You should now find the top hashtags in the previous 60/10 secs:
Note: if you do not see the scala console, keep pressing the "Display selected console" button until you see the scala console.
Checkout the project in my github repo.
You may also like:
Spark Streaming part 1: Real time twitter sentiment analysis
Spark streaming part 2: Real time twitter sentiment analysis using Flume
Data guarantees in Spark Streaming with kafka integration
Realtime stream processing using Apache Storm - Part 1
Realtime stream processing using Apache Storm and Kafka - Part 2