Friday, December 11, 2015

Multiple RDDs write to S3 in parallel

Spark has certain published api for writing to S3 files. S3 is a key part of Amazon's Data Lake strategy due to its low storage cost and optimized io throughput to many AWS components. Spark – S3 connectivity is inescapable when working with Big Data solutions on AWS.

The best known api for writing RDD to S3 is the saveAsTextFile(path) call. For example:

   myRdd.saveAsTextFile(“s3://my-s3-bucket/prefix/end-folder”)

In general, you will end up with multiple files written into the path (prefix + end-folder), one from each partition, so you ends up having file names like part_00000, part_00001, etc. under the same end- folder.

Where saveAsTextFile doesn't do the job

However, this assumes all the RDD data goes into a single bucket and path which are known in advance. However, there are many cases where you want to dictate the file path based on the RDD content. For example, the RDD data may have a field for product_id, and you would like the product_id to be part of the path, either as a folder or the file name. This is solved by a lesser known api

  saveAsHadoopFile(path, classOf[<keytype>], classOf[<datatype>], classOf[<OutputFormatType>)

To use this api, you must first convert your RDD into a key-value tuple, and fill in
  • a path prefix containing at least the bucket name
  • the class type of the key, generally a String
  • the class type of the value, generally a String as well
  • a custom OutputFormat class, which subclasses org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

A very simple example of the OutputFormatType subclass could look like this

class RDDOutputFormat extends MultipleTextOutputFormat[Any, Any] {

  override def generateActualKey(key: Any, value: Any): Any =
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
    key.asInstanceOf[String]
  }
}

However, instead of taking the key as is, you are free to manipulate key, value and name in any way you want to create the final file name. Here the variable “name” represents the split id, aka the partition number of the calling RDD. In case multiple RDD partitions are writing data with the same key, you would want to include the split id to avoid one partition over writing the previous one.

In some case you may want to write a single file for all data with the same key. You can do so by using a hash re-partitioning right before calling saveToHadoopFile.

Val myHash = new HashPartitioner(20) // 20 is a example number of partitions you want
val repartitionedRdd = rdd.partitionBy(myHash)

What this will do is re-partition your RDD based on the hash of your key. Since all data with the same key is grouped together, the saveToHadoopFile call is able to bundle all of them into a single file. In this case, you would have no reason to include the split id as part of the file name.

This is a very important technique once you go beyond simple S3 storage and actually want to leverage the structure (although not reflected by a real file structure underneath) implicit in the file path.

A case for multiple RDDs

However, there is a still more complex case where the requirement for a single bucket does not suffice. Think of a case where you want to write to buckets that are mapped to the region_id field of your data. The specification of S3 bucket essentially establishes a connection between S3 and the RDD. A single RDD can not be connected to multiple buckets simultaneously. This brings the discussion to the odd concept of breaking up the RDD into a list of RDDs, which is an oxymoron in the Spark world. You will see plenty comments on the web asking you to rethink the problem, and that RDD is already parallelized and meant to work as a single data set. In this use case however, the need to break up into multiple RDDs is completely legitimate.

This bring us to two questions: How to break up an RDD, and once you have a list of RDDs, how to save them (or otherwise process them) in a parallel manner.

Breaking up the RDD

The first option that comes to mind may be to filter the RDD N times, each time filter in the subset with a designated region_id. This would slow especially for high number of region_ids and a large dataset. The parallel equivalent would be leveraging the mapPartitionsWithIndex method.

First, we re-partition the initial RDD by its region_id field, taking care of making sure every partition contains a single region_id. For this, you will need to write a simple Partitioner, call it SingleRegionPartitioner

class SingleRegionPartitioner(partitions: Int, regionIds : List[String]) extends Partitioner {
 def numPartitions: Int = partitions
 def regionIdList: List[String] = regioIds

 def getPartition(key: Any): Int = key match {
  case null => 0
  case _ => regionIdList.indexOf(key)
 }

 override def equals(other: Any): Boolean = other match {
  case c: SingleRegionPartitioner =>
  c.numPartitions == numPartitions
   case _ =>
   false
 }

 override def hashCode: Int = numPartitions
}

val regions = rdd.map(_._1).distinct().collect
println("number of regions = " + regions.length)
val regionHash = new SingleRegionPartitioner(regions.length, regions.toList)
val partitionedRdd = rdd.partitionBy(regionHash)


At this point, each partition contains only data for one region_id. Now we can break up the RDD using the following technique. Create a class which accepts a region_id in its constructor, and provides a method for returning a single Iterator with matching region_id.

class RddFromPartition(i: Int) extends Serializable {

 def pickOnePartition(index: Int, iter: Iterator[(String, String)]): Iterator[(String, String)] = {
  if (index == i) {
   iter
  } else {
   Iterator()    // empty iterator
  }
 }
}

Next we call mapPartitionsWithIndex, feeding one region_id at a time. Since the lamda function only returns the iterator without looping through its elements, this call is completed very rapidly. If region_id has extremely high cardinality, the loop can even be replaced using scala multi-threading.

val rdds = ArrayBuffer[RDD[(String, String)]]()

for (i <- 0 to regions.length - 1) {
 val oneRdd = partitionedRdd.mapPartitionsWithIndex(new   RddFromPartition(i).pickOnePartition, false)
 println("oneRdd size = " + oneRdd.count)
 rdds.append(oneRdd)
}


Process the RDD list in parallel

Now that we have a list of RDDs, we want to call rdd.saveToHadoopFile on each, and feed the corresponding bucket name as path prefix. This can be achieved by calling a loop but augmented with scala Future.

import scala.concurrent._
import ExecutionContext.Implicits.global

rdds.map { regionRdd =>
val rddCount = regionRdd.count
val region = regionRdd.take(1)(0)._1
val regionBucket = pathMap.get(region).get

Future(regionRdd .saveAsHadoopFile(regionBucket, classOf[String],
  classOf[String], classOf[SimpleMultipleTextOutputFormat])
 )
}


Here, pathMap is a predefined map for matching S3 path to region_id. The Future wrapper allows the saveAsHadoop call to return immediately. If it is of interest to know when all files have been saved, the code can collect all the futures and wait for the completion event.

As a final word, S3 connections are not the most reliable in the world. It is likely you would want to put do the save inside a retry wrapper. Here the saveByKey is a function to be passed as an argument into the retry wrapper.


def saveByKey(path : String, keyedRdd : RDD[(String, String)]) = {
 keyedRdd.saveAsHadoopFile(path, classOf[String], classOf[String],
  classOf[SimpleMultipleTextOutputFormat])
}

def retry(callback: => Unit, maxTries : Int = 3) {
 var count = 0

 breakable{
  while(count < maxTries){
   try{
    callback
    break
   }catch{
    case ex: Exception => println(ex.getMessage)
     count += 1
    if(count == maxTries){
     throw ex
    }
    Thread.sleep(1000)
   }
  }
 }
}


Then change the future call to

Future(retry(saveByKey(regionRdd, regionBucket)))

Summary

This article addresses the need to write RDD data to multiple S3 buckets based on a particular attribute in the content. It presents novel techniques to efficiently break the original RDD into a list of RDDs, and then process the save method in parallel.