Spark has certain
published api for writing to S3 files. S3 is a key part of Amazon's
Data Lake strategy due to its low storage cost and optimized io
throughput to many AWS components. Spark – S3 connectivity is
inescapable when working with Big Data solutions on AWS.
The best known api
for writing RDD to S3 is the saveAsTextFile(path) call. For example:
myRdd.saveAsTextFile(“s3://my-s3-bucket/prefix/end-folder”)
In
general, you will end up with multiple files written into the path
(prefix + end-folder), one from each partition, so you ends up having
file names like part_00000, part_00001, etc. under the same end-
folder.
Where
saveAsTextFile doesn't do the job
However,
this assumes all the RDD data goes into a single bucket and path
which are
known in advance. However, there are many cases where you want to
dictate the file path based on the RDD content. For example, the RDD
data may have a field for product_id,
and you would like the product_id
to be part of the path, either as a
folder or the file name. This
is solved by a lesser known api
saveAsHadoopFile(path,
classOf[<keytype>], classOf[<datatype>],
classOf[<OutputFormatType>)
To use this api, you
must first convert your RDD into a key-value tuple, and fill in
a
path prefix containing at least the bucket name
the
class type of the key, generally a String
the
class type of the value, generally a String as well
a
custom OutputFormat class, which subclasses
org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
A
very simple example
of the
OutputFormatType
subclass
could look like this
class
RDDOutputFormat extends
MultipleTextOutputFormat[Any, Any] {
override
def
generateActualKey(key:
Any, value:
Any): Any =
NullWritable.get()
override
def
generateFileNameForKeyValue(key:
Any, value:
Any, name:
String):
String
={
key.asInstanceOf[String]
}
}
However, instead of
taking the key as is, you are free to manipulate key, value and name
in any way you want to create the final file name. Here the variable
“name” represents the split id, aka the partition number of the
calling RDD. In case multiple RDD partitions are writing data with
the same key, you would want to include the split id to avoid one
partition over writing the previous one.
In some case you may
want to write a single file for all data with the same key. You can
do so by using a hash re-partitioning right before calling
saveToHadoopFile.
Val
myHash
= new HashPartitioner(20) // 20 is a
example
number of partitions you want
val
repartitionedRdd
= rdd.partitionBy(myHash)
What
this will do is re-partition your RDD based on the hash of your key.
Since all data with the same key is grouped together, the
saveToHadoopFile call is able to bundle all of them into a single
file. In this case, you would have no reason to include the split id
as part of the file name.
This
is a very important technique once you go beyond simple S3 storage
and actually want to leverage the structure (although not reflected
by a real file structure underneath) implicit in the file path.
A case for multiple RDDs
However,
there is a still more complex case where the requirement for a single
bucket does not suffice. Think of a case where you want to write to
buckets that are mapped to the region_id
field of your data. The
specification
of S3
bucket essentially establishes a connection between S3 and the RDD. A
single RDD can not be connected to multiple buckets simultaneously.
This
brings the discussion to the odd concept of breaking up the RDD into
a list of RDDs, which is an oxymoron in the Spark world. You will see
plenty comments on the web asking you to rethink the problem, and
that RDD is already parallelized and meant to work as a single data
set. In this use case however, the need to break up into multiple
RDDs is completely legitimate.
This
bring us to two questions: How to break up an RDD, and once you have
a list of RDDs, how to save them (or otherwise process them) in a
parallel manner.
Breaking
up the RDD
The
first option that comes to mind may be to filter the RDD N times,
each time filter in the subset with a designated region_id. This
would slow especially for high number of region_ids and a large
dataset. The parallel equivalent would be leveraging the
mapPartitionsWithIndex method.
First,
we re-partition the initial RDD by its region_id field, taking care of
making sure every partition contains a single region_id. For this,
you will need to write a simple Partitioner, call it
SingleRegionPartitioner
class
SingleRegionPartitioner(partitions:
Int, regionIds
: List[String])
extends
Partitioner {
def
numPartitions:
Int = partitions
def
regionIdList:
List[String]
= regioIds
def
getPartition(key:
Any): Int = key
match
{
case
null
=> 0
case
_ => regionIdList.indexOf(key)
}
override
def
equals(other:
Any): Boolean = other
match
{
case
c:
SingleRegionPartitioner
=>
c.numPartitions
==
numPartitions
case
_ =>
false
}
override
def
hashCode:
Int = numPartitions
}
val
regions
= rdd.map(_._1).distinct().collect
println("number
of regions
= "
+
regions.length)
val
regionHash
= new
SingleRegionPartitioner(regions.length,
regions.toList)
val
partitionedRdd
= rdd.partitionBy(regionHash)
At
this point, each partition contains only data for one region_id. Now
we can break up the RDD using the following technique. Create
a class which accepts a region_id in its constructor, and provides a
method for returning a single Iterator with matching region_id.
class
RddFromPartition(i:
Int) extends
Serializable
{
def
pickOnePartition(index:
Int, iter:
Iterator[(String,
String)]):
Iterator[(String,
String)]
= {
if
(index
==
i)
{
iter
}
else
{
Iterator() // empty iterator
}
}
}
Next
we call mapPartitionsWithIndex, feeding one region_id at a time.
Since the lamda function only returns the iterator without looping through
its elements, this call is completed very rapidly. If region_id has
extremely high cardinality, the loop can even be replaced
using scala multi-threading.
val
rdds
= ArrayBuffer[RDD[(String,
String)]]()
for
(i
<- 0
to
regions.length
-
1)
{
val
oneRdd
= partitionedRdd.mapPartitionsWithIndex(new RddFromPartition(i).pickOnePartition,
false)
println("oneRdd
size = "
+
oneRdd.count)
rdds.append(oneRdd)
}
Process
the RDD list in parallel
Now
that we have a list of RDDs, we want to call rdd.saveToHadoopFile on
each, and feed the corresponding bucket name as path prefix. This can
be achieved by calling a loop but augmented
with scala Future.
import
scala.concurrent._
import
ExecutionContext.Implicits.global
rdds.map
{ regionRdd
=>
val
rddCount
= regionRdd.count
val
region
= regionRdd.take(1)(0)._1
val
regionBucket
= pathMap.get(region).get
Future(regionRdd
.saveAsHadoopFile(regionBucket,
classOf[String],
classOf[String], classOf[SimpleMultipleTextOutputFormat])
)
}
Here,
pathMap is a predefined map for matching S3 path to region_id. The
Future wrapper allows the saveAsHadoop call to return immediately. If
it is of interest to know when all files have been saved, the code
can collect all the futures and wait for the completion event.
As
a final word, S3 connections are not the most reliable in the world. It
is likely you would want to put do the save inside a retry wrapper.
Here
the saveByKey is a function to be passed as an argument into the
retry wrapper.
def
saveByKey(path
: String,
keyedRdd
: RDD[(String,
String)])
= {
keyedRdd.saveAsHadoopFile(path,
classOf[String],
classOf[String],
classOf[SimpleMultipleTextOutputFormat])
}
def
retry(callback:
=> Unit, maxTries
: Int = 3)
{
var
count
= 0
breakable{
while(count
<
maxTries){
try{
callback
break
}catch{
case
ex:
Exception
=> println(ex.getMessage)
count
+= 1
if(count
==
maxTries){
throw
ex
}
Thread.sleep(1000)
}
}
}
}
Then
change the future call to
Future(retry(saveByKey(regionRdd,
regionBucket)))
Summary
This
article addresses the need to write RDD data to multiple S3
buckets based on a particular attribute
in the content. It presents novel
techniques to efficiently break the original RDD into a list of RDDs,
and then process the save method in parallel.