Thursday 14 January 2016

Spark: Reading Avro serialized Data

Avro being the serialization format of choice in hadoop ecosystem and highly prevelant in legacy mapreduce/hive data pipeline, it becomes necessary to be able to read and process avro serialized data in spark.

Below is the complete code snippet along with the description to read avro data and output all unique PARTN_NBR ids along with their count of occurrences.

SparkConf sparkConf = new SparkConf().setAppName("RDD-Usecase1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

In the above code snippet, we are intializing the sparkConf object and setting the name of the application. In the next line, we are creating sparkContext which uses spark configuration object, sparkConf to initialize the spark application context, sc. Here, JavaSparkContext is the java subclass of the scala implemented, SparkContext.

As of till spark 1.5, the thumb rule for SparkContext is that there can't be multliple SparkContext instances in the same JVM. Implementation-wise, when the SparkContext constructor is called, it is ensured that no other SparkContext instance is running. It throws an exception if a running context is detected and logs a warning if another thread is constructing a SparkContext


JavaPairRDD<AvroKey,NullWritable> records = sc.newAPIHadoopFile(avroFilePath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, new Configuration());

Here, variable avroFilePath variable holds the string value of the HDFS path of the input avro file. Important point to note is that it doesn't support reading partitioned avro data.
SparkContext's read function, "newAPIHadoopFile" is a generic read function which has five input parameters:
1) hdfs path of input file
2) Data Input Format Class
3)  Key Format Class
4) Value Format Class
5) Hadoop Configuration Instance
Output of the function is the JavaPairRDD instance which abstracts the input data as RDD with key-value pair.

JavaPairRDD<String, Integer> ones = records.mapToPair(new PairFunction<Tuple2<AvroKey,NullWritable>, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Tuple2<AvroKey, NullWritable> record) throws Exception {
        return new Tuple2<String, Integer>((String) ((GenericRecord)(record._1.datum())).get("PARTN_NBR"), 1);
    }
});

"mapToPair" is the standard RDD transformation function to extract key-value pairs of a RDD. Input parameter is the overridden implementation of the call function of the interface, PairFunction.
"call" function implementation has logic on how to read avro serialized object and extract one of the field, "PARTN_NBR". The input to call function is the "record" object which is avro serialized key-value object. "record._1" gives the avro-serialized key object and "record._2" gives avro-serialized value object. "datum()" function cast the object to GenericRecord instance from which we can extract any field by giving it as parameter to get() function.
Output object, Tuple2's value is integer,1 keeping with the program logic.

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
});

"reduceByKey" is the standard RDD transformation function whose working is similar to reducer. It is used to aggregate data on key.

List<Tuple2<String, Integer>> output = counts.collect();

for (Tuple2<?,?> tuple : output) {
    System.out.println(tuple._1() + ": " + tuple._2());
}

"collect" is the standard RDD action function. It will result in the trigger of all the above transformation functions and collect the final output in the in-memory data structure.
The output can be printed on console or saved in a file.

sc.stop();
sc.close();

It is mandatory to stop and close SparkContext as it will close and clear the complete application context. All the resources will be released.

Wednesday 6 January 2016

Configuring spark history server for running on Yarn in CDH

In Spark-on-Yarn mode, each running spark application on yarn launches its own web ui which can be accessed from Yarn Resource Manager UI with "tracking url" link. This web ui has all the data on running spark application like event timeline, jobs, stages, task, task metrics, etc.
By default configuration, we can only see this web ui for running jobs. To enable it to do the same for completed jobs, spark history server has to be started and configured.
Spark history server is used to maintain and visualize the event-logs of the spark application after they got completed running on Yarn.

I tested this on CDH 5.3 and 5.5 which have spark version 1.3 and 1.5 respectively.

1) Test if spark-history-server is running or not
$ /etc/init.d/spark-history-server status
If it is not running, start it using
$ /etc/init.d/spark-history-server start

2) Configuring spark-history-server
We need to know two configuration of spark-history-server:
spark.history.fs.logDirectory : this is the directory where history-server expects the application event logs
- spark.history.ui.port : port on which it runs

These properties are configured in file "/etc/default/spark".


So we know that SPARK_HISTORY_SERVER_WEBUI_PORT is 18088 and SPARK_HISTORY_SERVER_LOG_DIR is hdfs:///user/spark/applicationHistory

If we want to change any of this property, we can change it in this file and restart the spark-history-server.

PS: There is quicker way to get to know the value of these properties of spark history server:
$ ps -ef | grep HistoryServer
mapred    2595     1  0 01:54 ?        00:01:54 /usr/java/jdk1.7.0_67-cloudera/bin/java -Dproc_historyserver -Xmx1000m -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=hadoop.log -Dhadoop.root.logger=INFO,console -Dhadoop.id.str=mapred -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=mapred-mapred-historyserver-quickstart.cloudera.log -Dhadoop.root.logger=INFO,console -Dmapred.jobsummary.logger=INFO,JSA -Dhadoop.security.logger=INFO,NullAppender org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
spark     4174     1  0 01:55 ?        00:03:02 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.5.0-cdh5.5.0-hadoop2.6.0-cdh5.5.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory -Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.history.HistoryServer
cloudera 18659  6018  0 09:06 pts/0    00:00:00 grep HistoryServer 

3) Configuring spark
Add following properties in "/etc/spark/conf/spark-default.conf"(in this file, keys and values are separated by whitespace characters).



Note that the port in "spark.yarn.historyServer.address" should be equal to "SPARK_HISTORY_SERVER_WEBUI_PORT" set in history-server. Similarly, the value of "spark.eventLog.dir" should be equal to "SPARK_HISTORY_SERVER_LOG_DIR" set in history-server.

4) Now run a spark application.
After application has completed, go to yarn ui for that application and click on "Histoy" link. It will take you to the spark web ui for that application.

Tuesday 5 January 2016

Unit Testing Spark Job

During my time writing lots of mapreduce code and building distributed applications over hadoop framework, one thing i understood that it needs a new unit testing approach than standard java application testing one. As mapreduce code is executed in distributed hadoop framework, we can't do both functional and performance testing as part of unit testing. So, functional testing can be done by using MRUnit framework in which we can simulate working of mapper/reducer using mocked input data. But for performance testing of mapreduce job/workflow, we still have to run mapreduce job with different configuration and input data loads to get the best fit of all of them.

Sometime back when i was a developing distributed application using spark, i applied same testing approach that I described earlier for Hadoop applications. I am not sure if it is by intent by spark developers but unit testing a spark job is much more obvious and easier to think & implement than mapreduce job.

Spark job can be unit tested using following "local" deploy modes.

Deploy Mode
Description
local
Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K]
Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*]
Run Spark locally with as many worker threads as logical cores on your machine.

As first deploy mode, "local" run with one worker thread, there will be 1 executor only. It can be used to do functional unit testing.
After doing functional testing, we can run the code with second deploy mode, "local[K]". For example, if we run with local[2], meaning two worker threads - which represents “minimal” parallelism. It can help in detecting bugs that only exist when we run in a distributed context.

Another major usability factor is the ease of testing the code with these deploy modes. In contrast to MRUnit which is a third party dependency and needs writing lot of boilerplate code, spark deploy modes are sweet and simple.
In Junit testcase for that spark job, you can use "setMaster" method of sparkConf object to set deploy mode like below:

SparkConf sparkConf = new SparkConf().setAppName("Testcase-1");
sparkConf.setMaster("local[2]");


For performance testing, the process is similar to one adopted in Hadoop. I ran the spark job with different values of configuration properties and data loads to get the best fit.