Make sure you paste the 4 oAuth tokens in the conf file.
Flume pushes data to sink at hostname Ubuntu and port 9988. Spark streaming mush be configured with the same hostname and port to receive input datasteam.(Change the hostname and port according to your setup)
We use FlumeUtils to create a stream by passing the same hostname and port at which Flume is configured to push the avro stream events.
tweets are extracted from flume events by e.event.getBody
The rest of the code is similar to the spark streaming integration with twitter we did in our previous post.
setMaster("local[6]") - Make sure to set master to local mode with at least 2 threads as 1 thread is used for collecting the incoming streams and another thread for processing it.
We count the popular hashtags with the below code:
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
The code basically does a word count of the hashtags over the previous 60/10 secs as specified in the reduceByKeyAndWindow and sorts them in descending order.
reduceByKeyAndWindow is used in case we have to apply transformations on data that is accumulated in the previous stream intervals.
Execute:
Note: Flume requires spark app to be listening to the port first before it can push the events to the port. So make sure to run the spark app before running flume.
- Right click on the .scala file > click Run (No need to pass arguements)
- Run flume conf: flume-ng agent -n TwitterAgent -f /usr/local/flume/apache-flume-1.6.0-bin/conf/TwitterAvroSource.conf
Back in your eclipse console you should see this:
Checkout the project in my github repo.
You may also like:
Spark Streaming part 1: Real time twitter sentiment analysis
Spark streaming part 3: Real time twitter sentiment analysis using kafka
Data guarantees in Spark Streaming with kafka integration
Realtime stream processing using Apache Storm - Part 1
Realtime stream processing using Apache Storm and Kafka - Part 2