MapReduce VS Spark - Secondary Sort Example

Problem Statement:
Dataset is available at this link.
Solution:
The dataset can be sorted on both lastname and firstname by implementing a secondary sorting technique which sorts based on values.

MapReduce:
MapReduce sorts records based on keys but not on values. But when the scenario demands sorting on both keys and values, secondary sorting technique helps us achieve this using a composite key implementation.

Person.java:
Person class is a composite key implementing WritableComparable interface. It contains the firstname and lastname which are set in the map() method.


PersonMapper.java:
map() method receives lastname and firstname as the key - value pair and outputs the {lastname, firstname} as a Person which is a composite key and the firstname as the value.


PersonPartitioner.java:
Since the mapper outputs a custom key, we need a custom partitioner implementation to partition data only on lastname so that records with the same lastname goes to the same reducer.

PersonSortingComparator.java:
The composite key also requires a custom sorting comparator to sort them before being sent to the reducer.
PersonGroupingComparator.java:
The grouping comparator groups together the keys with the list of values that are sent to reducer input.


PersonReducer.java:
reducer() method receives Person and firstname as the key-value pair and prints the lastname and firstname after applying secondary sort.
Driver.java:
We use the KeyValueTextInputFormat which reads the records as a key-value pair and use the property "key.value.separator.in.input.line", "," to override the default delimiter which is tab.
Few additional properties that should be specified are:
job.setMapOutputKeyClass(Person.class) - Set output key class to Person composite key explicitly
job.setPartitionerClass(PersonPartitioner.class) - Set custom partitioner class
job.setSortComparatorClass(PersonSortingComparator.class) - Set custom sorting class
job.setGroupingComparatorClass(PersonGroupingComparator.class) - Set grouping comparator class
job.setNumReduceTasks(2) - Set reducer tasks to 2 as output has to be sent to 2 files

Execute:
yarn jar MRSecondarySort-0.0.1-SNAPSHOT.jar com.stdatalabs.MRSecondarySort.Driver Names.csv MRSecondarySort_op

Output:


   File1:
   File2:


Spark:
The code complexity and number of lines of code can be drastically reduced in Spark as it is easier to implement algorithms without all the boiler plate code that we see in mapreduce.

pairsRDD = personRDD.map(_.split(",")).map { k => (k(0), k(1)) } - Maps keys and values from
input RDD
numReducers = 2 - Set number of reducers to 2
listRDD = pairsRDD.groupByKey(numReducers).mapValues(iter => iter.toList.sortBy(r => r)) -
Group by key and create a list of values from each key
resultRDD = listRDD.flatMap {case (label, list) => {list.map((label, _))}} - Apply flatMap
transformation to convert key, List(value1, value2) to key value pair


Execute:
spark-submit --class com.stdatalabs.SparkSecondarySort.Driver --master yarn-client SparkSecondarySort-0.0.1-SNAPSHOT.jar /user/cloudera/Names.csv /user/cloudera/SparkSecondarySort_op

Output:

   File1:

   File2: 

The dataset and the complete source code can be found at this github repo.

You may also like:
MapReduce VS Spark - Aadhaar dataset analysis

MapReduce VS Spark - Wordcount Example

MapReduce VS Spark - Inverted Index example

Spark Streaming part 1: Real time twitter sentiment analysis

Labels: , ,