MapReduce VS Spark - Aadhaar dataset analysis

In continuity with MapReduce Vs Spark series where we discussed problems such as wordcount, secondary sort and inverted index, we take the use case of analyzing a dataset from Aadhaar - a unique identity issued to all resident Indians. The issuing authority - UIDAI provides a catalog of downloadable datasets collected at the national level. our dataset of interest is "Enrollments processed in detail" which can be downloaded from this link.

Problem Statement:

Solution:
Once the dataset is handy, we first look at the hive query to count the number of Unique Identities generated in each state. As hive runs mapreduce jobs internally, we walkthrough the mapreduce code that is required to compute the state wise identities that are generated.

Hive:
Create a hive table with table properties skip.header.line.count'='1' as the dataset contains a header.

When we run the query to get number of Aadhaars generated for each state:



First mapreduce job computes the total Aadhaars generated for each state. Second mapreduce job sorts the States according to number of Aadhaars generated in descending order,

Hive Output:



MapReduce:
To get an idea on how hive executes queries internally using mapreduce, we create 2 mapreduce jobs similar to hive execution. Our first mapreduce job should count the number of Aadhaar identities generated for each state. To sort the output in descending order, we need another mapreduce job.

Job 1:

NumUIDMapper.java:
Map method reads the input file. As the file contains a header in first line which has byte offset=0, we skip reading the line if mapper key i.e. byteoffset=0. Rest of the lines are processed to write State, Aadhaars generated as key value pair to output.



NumUIDReducer.java:
Reduce method reads (state, Aadhaars generated) as input key value pair. It then sums up the number of Aadhaars generated for each key - state and writes the aggregated key value pair to output.



Job 2:
Since the output from the 1st job has to be sorted in descending order, we create another mapreduce job with a custom sorting comparator that sorts the output in descending order.

SortMapper.java:
Output from 1st job contains (state, count) as key value pair. As sorting happens only on keys, SortMapper class swaps the key value pair so that the output contains states sorted according to counts.


SortComparator.java:
Since the default sorting comparator sorts in ascending order, we need to implement a custom sorting comparator which sorts in descending order.


SortReducer.java:
Once Custom sorting comparator sorts the intermediate key value pair, reducer once again swaps them back from (count, state) to its original format (state, count).


Driver.java
Apart from setting the normal driver configurations, we need to create 2 job objects. 1st job object - stateWiseCount implements the State wise count mapper and reducer classes whereas the 2nd job object - sort implements the mapper, reducer and sorting comparator classes that sorts the output in descending order.


Build jar and execute:

Run the MR job by passing 3 arguments i.e. input path, output path for 1st MR job and output path for 2nd MR job.
yarn jar MRAadhaarAnalysis-0.0.1-SNAPSHOT.jar com.stdatalabs.MRAadhaarAnalysis.Driver /user/cloudera/UIDAI-ENR-DETAIL-20170308.csv MRUid_op MRUidSorted_op

Output:

Spark:

We deviate from our usual approach of using Spark RDD transformations to do our job and take a look at Spark SQL to process the dataset using Dataframes.
Dataframe is an immutable collection of data distributed across nodes similar to RDD. Dataframes are similar to tables in RDBMS in that data is organized into named columns. Hive tables can be read as dataframes or any existing RDDs can be converted to dataframes by imposing a structure on it.

UIDStats.scala:
Since the dataset is now available as a dataframe, it allows us to run queries on top of it making it easier to find each of the 3 objectives from the problem statement - Number of identities generated per state, Number of identities generated per enrollment agency, top 10 districts with maximum identities generated for both male and female.



stateWiseDF - Result from 1st query returns the number of identities generated for each state as a dataframe which is written into hive table - state_wise_count under hive database - uid
maxEnrolmentAgencyDF - Result from 2st query returns the number of identities generated for each enrollment agency as a dataframe which is written into hive table - agency_wise_count under hive database - uid
districtWiseGenderCountDF - Result from 3st query returns the top 10 districts with maximum identities generated for male and female as a dataframe which is written into hive table - district_wise_gnrd_count under hive database - uid

Build jar and execute:
spark-submit --class com.stdatalabs.SparkAadhaarAnalysis.UIDStats --packages com.databricks:spark-csv_2.10:1.5.0 --master yarn-client SparkAadhaarAnalysis-0.0.1-SNAPSHOT.jar /user/cloudera/UIDAI-ENR-DETAIL-20170308.csv

On successful execution, 3 output tables would be created in hive.

Output:

state_wise_count: 

agency_wise_count:

district_wise_gndr_count:

The source code for this project can be found at this github repo.

You may also like:
MapReduce VS Spark - Wordcount Example

MapReduce VS Spark - Secondary Sort Example

MapReduce VS Spark - Inverted Index Example

Spark Streaming part 1: Real time twitter sentiment analysis

Labels: , ,