Quick Start Spark with Scala API

Spark has been proved a better Data Processing Framework already for Big Data scenarios. It has very new Data Model known as RDD which is the abstraction layer on actual data. RDD’s primary home is primary memory (RAM of course) but they can be spilled to disc as well by scheduler. The idea behind keeping them In-Memory is to minimizing the DISC-IO and providing the simmiler workaround for data locality of Hadoop’s. Data Locality of Hadoop won many minds and made Hadoop as #1 Data Processing Platform from last decade.

I tried to test Spark for one of my use cases which was designed for PIg + Shell Script + Java UDF’s. Here is the quich snapshot of use case:

  • Read CSV file from HDFS which has 100,000,000 records (FIle Size 5.30 G) which schema as <CreditCard Number, Cell Number, Timestamp>
  • Mask Credit Card and Cell Number Values with ‘X’
  • Create one column with value as UUID (a global unique ID implementation using MD5 checksum)
  • Write the updated CSV back to HDFS

I found this use case interesting for Spark as in-memory big data manipulation is always fascination for me as Map Reduce 1 and Map Reduce 2 both work on “HDFS Block Size” (which is determined by framework) for data manipualtion.

Spark uses the partitions determined by HDFS but keeps them in memory till accomodation extent. It performs Transformations and Actions over the memory resident partitions and keeps track of each transformations and actions.

val conf = new SparkConf().setAppName(“SimpleScala”).setMaster(“spark://master:7077”)
.set(“spark.executor.memory”, “2g”)
val sc=new SparkContext(conf)
sc.addJar(“/home/user/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar”);

val csv= sc.textFile(“hdfs://namenode:9000/user/scalaTest/customer.csv”).cache()
val PID = processId();
val MAC = macAddress();

val data : RDD[(String,String,String,String)] = csv.map(line => {
val parser = new CSVParser(‘,’)
val columns= parser.parseLine(line)
val md = MessageDigest.getInstance(“MD5”);
md.update((UUID.randomUUID().toString() + Integer.toString(PID) + MAC + new Timestamp(System.currentTimeMillis()).toString()).getBytes());
(byteToHex(md.digest()),createMaskOnCreditCard(columns(0)),createMaskOnCellNum(columns(1)),columns(2))

})

data.saveAsTextFile(“hdfs://namenode:9000/user/scalaTest/sparkOut”)