Data guarantees in Spark Streaming with Kafka integration

Making a streaming application fault-tolerant with zero data loss guarantees is the key to ensure better reliability semantics. With Spark streaming providing inbuilt support for Kafka integration, we take a look at different approaches to integrate with kafka with each providing different semantics guarantees.
  1. Receiver based approach
  2. Direct approach
We will discuss both these approaches, their pros and cons and see them at work by building on Twitter Popular hashtags using kafka producer code that we discussed previously.

Receiver based approach:
  • This approach uses receivers implemented using a high level Kafka consumer API to receive data. 
  • By default this data is stored and replicated in two spark executors for fault-tolerance. 
  • The receiver then acknowledges the received data by updating offsets in zookeeper.
  • On receiver failure, the received and yet to be replicated data wont be acknowledged hence making kafka resend the un-acked data once receiver recovers.
This is until failure occurs only on the receiver side.

Checkpointing:
When spark driver fails, executors are killed. This makes data unrecoverable on driver failure as it is stored In-memory in executors. This leads us to a technique called Checkpointing where the all important metadata is stored in HDFS or other fault-tolerant storage.


Metadata includes:

  • Configuration: The configuration on which the streaming application was created
  • DStreams operations: The code that forms the streaming application
  • Incomplete batches: Queued jobs that are yet to be completed
However metadata checkpointing may still not ensure zero data loss. There may be scenarios where
  1. Input data is received and stored in-memory where executor is running
  2. Receiver sends an acknowledgement to input source
  3. Driver suddenly fails as a result of which executors are killed
  4. Buffered data in the executors is lost and cannot be recovered
  5. We are now faced with situation where data is received and acknowledged but lost before being processed

So we look at Write ahead logs which is yet another attempt to ensure at-least once semantics i.e. zero data loss.


Write Ahead Logs:

In order to ensure that there is no data loss even on driver failure, Spark 1.2 came up with Write Ahead Logs (WAL) which when enabled saves the received data into log files on a fault-tolerant file system like HDFS.  

The lifecycle of the received data goes like this...


  • Receiver stores the received data as blocks in-memory of the executors and is also written to Write Ahead Logs (WAL)
  • Metadata is sent to the streaming context in the driver. Metadata includes block location information and offset information of the block in WAL 
  • RDDs and jobs are generated by streaming context for every job interval using the block information. Spark context then runs tasks for these jobs by processing blocks available in-memory of executors
  • The DStream computations are additionally checkpointed into HDFS
When driver fails and is restarted, 
  • Using the checkpointed information, the driver is restarted, contexts are reconstructed and receivers are restarted
  • Block metadata is recovered
  • For batches which failed, the RDDs and corresponding jobs are regenerated using the recovered block metadata
  • Blocks available in the Write Ahead Logs are read when these jobs execute
  • The buffered data which is unsaved to WAL at the time of failure is resent by Kafka source as it is not ack-ed by receiver
 Hence despite driver failures, this approach guarantees at-least once semantics i.e. zero data loss.

Code implementation:
Lets now look at how at-least once semantics can be implemented in Spark Streaming. For this we will improve on the Streaming app that we developed previously to analyze Twitter Popular hashtags using kafka producerWe create a  RecoverableKafkaPopularHashTags scala class that:
  • Enables Write Ahead Logs
  • Takes the checkpoint directory name as argument
  • If checkpoint directory exists then the streaming context is recreated using checkpointed data
  • If checkpoint directory doesn't exit, then a function is called to create a new context
A few pointers from this code:
 StreamingContext.getOrCreate(checkpointDirectory,  
    () => createContext(zkQuorum, group, topics, numThreads, checkpointDirectory))  
  • If the application is started for the first time, checkpoint directory doesn't exit. So createContext function is called which creates a new streaming context, sets up streams and then returns the streaming context object using which start() is called
  • If the application is restarted after failure the streaming context is recreated from the checkpoint data in the checkpoint directory
  • Write Ahead Logs are enabled by setting the property:
 conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")  
Running the app:
Checkpoint directory should be an absolute path in HDFS. But since in our case we are running the app in eclipse, we want the directory to be created within the project. So we just pass the directory name as argument.

You can find the kafkaTwitterProducer code at my github repo.

Run the kafkaTwitterProducer passing the 4 twitter oAuth keys, the kafka topic to subscribe to and the keywords to filter the tweets as arguments:








Now run the RecoverableKafkaPopularHashTags class passing the arguments: 
  • zookeeper hostname
  • consumer group
  • kafka topics
  • number of threads and 
  • checkpoint directory


Since this is the first time we are running the Spark code, it creates a new context. The same can be conformed from the console...


You should see the below output:

In order to see the working of checkpointing and WAL, we need to simulate a driver failure. For this we can simply terminate and restart the RecoverableKafkaPopularHashTags spark job and still keep the kafkaTwitterProducer job running.

The checkpoint directory should also be visible once the project is refreshed in eclipse.

After the spark job is restarted, since the checkpoint directory is already present, it picks up the metadata to check from which point it has to restart processing the data.
     .
     .
     .
     
   Recover already acknowledged data from WAL that is saved to HDFS.

   Once it starts processing from WAL, you should start seeing the      expected output... 

While this method ensures zero data loss, it also has its downsides.
  • Data is replicated twice. Once by kafka and once by WAL (written to HDFS i.e. disk I/O reducing the receiver throughput)
  • Records may be consumed twice on failure. This is due to inconsistencies in offsets being updated in zookeeper upon failure(data may be received by spark but job fails before updating in zookeeper).
To guarantee exactly-once semantics(read data exactly once), a new Direct approach was introduced in Spark 1.3.

Direct Approach (No receivers):
  • Built using Kafka's Simple Consumer API, this approach makes use of the ability to replay data from arbitrary offsets in Kafka
  • In this method the spark driver calculates the range of kafka offsets to be consumed by spark executors in the next micro batch
  • When each batch's jobs are executed, data corresponding to offset ranges is read from kafka just like reading files from HDFS
  • Offsets are saved reliably in checkpoint directory and used to recompute data on recovery from failure
  • Allows for data to be processed exactly once despite failures.
  • No WALs are required as there are no receivers

Advantages:
  • Does not use zookeeper. Consumed offset information is maintained by spark streaming which reduces inconsistencies that occur between data reliably received by spark and offsets tracked by zookeeper
  • No need to create multiple input kafka streams and union them. Spark streaming creates as many RDD partitions as there are kafka streams which is a one to one mapping between kafka streams and spark RDDs and each kafka stream is automatically read in parallel
  • Data is stored only once in kafka and is not replicated like in the receiver based approach
Enough of theory. Lets jump to code...

Code Implementation:
Create a KafkaDirectReciverPopularHashTags class similar to RecoverableKafkaPopularHashTags class that we discussed previously.
Similar to the receivers based approach, this class
  • Takes the checkpoint directory name as argument
  • If checkpoint directory exists then the streaming context is recreated using checkpointed data
  • If checkpoint directory doesn't exit, then a function is called to create a new context
 val ssc = StreamingContext.getOrCreate(checkpointDirectory,  
    () => createContext(brokers, topics, checkpointDirectory))  
  • If the application is started for the first time, checkpoint directory doesn't exit. So createContext function is called which creates a new streaming context, sets up streams and then returns the streaming context object using which start() is called
  • If the application is restarted after failure the streaming context is recreated from the checkpoint data in the checkpoint directory
   val kafkaParams = Map[String, String](  
    "metadata.broker.list" -> brokers,  
    "auto.offset.reset" -> "largest")  
  • Create a map of kafkaParams i.e. the broker list (hostname/ip of the kafka broker) and auto reset of the offset to the "largest" or "smallest" available kafka offset on failure
 val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)  
  • Create the stream using the createDirectStream class passing the streaming context object, kafka params and the topics to consume
  • Rest of the code is same as in the receiver based approach
Running the app:
Checkpoint directory should be an absolute path in HDFS. But since in our case we are running the app in eclipse, we want the directory to be created within the project. So we just pass the directory name as argument.

You can find the kafkaTwitterProducer code at my github repo.

Run the kafkaTwitterProducer passing the 4 twitter oAuth keys, the kafka topic to subscribe to and the keywords to filter the tweets as arguments:

Now run the KafkaDirectReciverPopularHashTags class passing the arguments:

When you run the code, the output remains same as that of receiver based approach. You may terminate the spark app alone and then restart it to see the checkpointing at work.

Checkout the project in my github repo.

You may also like:
Spark streaming part 3: Real time twitter sentiment analysis using kafka

Spark Streaming part 1: Real time twitter sentiment analysis


Spark streaming part 2: Real time twitter sentiment analysis using Flume


Realtime stream processing using Apache Storm - Part 1


Realtime stream processing using Apache Storm and Kafka - Part 2

Labels: , ,