Saturday, February 6, 2016

Time series is a specialty area in the field of analytics. The most well known application is in the area of Econometrics. In this blog, I like to introduce a simple and clearly hypothetical time series use case to stimulate interest. 

The most prevalent analytics formula you will see in statistics is linear regression, something of the form

In time series, the independent variables x represents lagged values of the regressant y or one or more other independent variables

You can see that in addition to the x and y terms, sometimes the equation can be further complicated with a constant, a linear trend, and a noise term. It's not hard to see how mathematicians can really run wild with something of this complexity. However, as always, once you break down the different aspects, one finds that the beast is tamable after all.

So what are the key components?

1. auto-regression – when y depends on its own past values, it is known as auto-regression, for obvious reasons

2. moving average – when y depends on past values of another independent valuable x, it is called moving average, also obvious after you think about it

3.  - the constant term is a simple bias, usually set to zero to simplify the derivations

4.  - a linear trend representing a continuous increase or decrease of y over time

5.  - finally, there is always a noise term which explains away any discrepancy in the predicted y value. 

Statisticians use the term iid (independent and identically distributed) to characterize this noise term.
So where does time series prediction come in in the real world? You can probably come up with a dozen different ideas off the bat, but none as entertaining as what these guys Thurman and Fisher published on the age old question of which came first? The chicken or the egg? You should never accuse statisticians for being humor less after this. 

Walter Thurman and Mark Fisher, “Chickens, Eggs, and Causality, or Which Came First?”, American Journal of Agricultural Economics, May 1988

They took actual data on US egg production and chick population from 1930 to 1983, and performed a Granger causality test. This is a test which finds the difference in prediction power on y, either using lagged values of 1) both x and y or 2) y only. You can find an exercise in R which carries out the computation here, attributed to Cory Lemeister:


Note that Clive Granger himself pointed out that his method is often misinterpreted and abused. When the method shows that x causes y, it may not be a true causation, but rather that there may be another factor z, which causes both x and y, but with the effect on x becomingobservable before y. For this reason, the proper term to use is x granger-causes y, rather than x causes y.
Professor Dave Giles from the University of Victoria wrote up a very good blog on the equations and concepts involved in performing Granger causality test.


First notice the use of the term VAR (Vector Auto Regression). The inclusion of the word vector is due to the fact that there are two equations predicting both x and y. VAR along with a partner concept VEM (Vector Error Correction) had such a huge impact in the field of econometrics, that the guy who demonstrated its practical use, Christoper Sims, received the Nobel prize in economics in 2011.

The gist of this technique is try to eliminate all b parameters in eqn 1, and d parameters in eqn 2. For example, if all b parameters in eqn 1 are gone, then y does not dependent on x at all, and therefore x can not granger-cause y. In statistics speak, one would reject the null hypothesis that the b parameters are zero (or upon failure of rejection, accept the null hypothesis), which would imply there is Granger causality (or the reverse, there isn't). The rejection technique uses a number of tests that are common practices. Gile's blog mentions Ward test, while Lemeister uses p-value, each of which is a lesson by itself.

If you believe the results, apparently eggs granger-causes chickens!

Friday, December 11, 2015

Multiple RDDs write to S3 in parallel

Spark has certain published api for writing to S3 files. S3 is a key part of Amazon's Data Lake strategy due to its low storage cost and optimized io throughput to many AWS components. Spark – S3 connectivity is inescapable when working with Big Data solutions on AWS.

The best known api for writing RDD to S3 is the saveAsTextFile(path) call. For example:

   myRdd.saveAsTextFile(“s3://my-s3-bucket/prefix/end-folder”)

In general, you will end up with multiple files written into the path (prefix + end-folder), one from each partition, so you ends up having file names like part_00000, part_00001, etc. under the same end- folder.

Where saveAsTextFile doesn't do the job

However, this assumes all the RDD data goes into a single bucket and path which are known in advance. However, there are many cases where you want to dictate the file path based on the RDD content. For example, the RDD data may have a field for product_id, and you would like the product_id to be part of the path, either as a folder or the file name. This is solved by a lesser known api

  saveAsHadoopFile(path, classOf[<keytype>], classOf[<datatype>], classOf[<OutputFormatType>)

To use this api, you must first convert your RDD into a key-value tuple, and fill in
  • a path prefix containing at least the bucket name
  • the class type of the key, generally a String
  • the class type of the value, generally a String as well
  • a custom OutputFormat class, which subclasses org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

A very simple example of the OutputFormatType subclass could look like this

class RDDOutputFormat extends MultipleTextOutputFormat[Any, Any] {

  override def generateActualKey(key: Any, value: Any): Any =
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
    key.asInstanceOf[String]
  }
}

However, instead of taking the key as is, you are free to manipulate key, value and name in any way you want to create the final file name. Here the variable “name” represents the split id, aka the partition number of the calling RDD. In case multiple RDD partitions are writing data with the same key, you would want to include the split id to avoid one partition over writing the previous one.

In some case you may want to write a single file for all data with the same key. You can do so by using a hash re-partitioning right before calling saveToHadoopFile.

Val myHash = new HashPartitioner(20) // 20 is a example number of partitions you want
val repartitionedRdd = rdd.partitionBy(myHash)

What this will do is re-partition your RDD based on the hash of your key. Since all data with the same key is grouped together, the saveToHadoopFile call is able to bundle all of them into a single file. In this case, you would have no reason to include the split id as part of the file name.

This is a very important technique once you go beyond simple S3 storage and actually want to leverage the structure (although not reflected by a real file structure underneath) implicit in the file path.

A case for multiple RDDs

However, there is a still more complex case where the requirement for a single bucket does not suffice. Think of a case where you want to write to buckets that are mapped to the region_id field of your data. The specification of S3 bucket essentially establishes a connection between S3 and the RDD. A single RDD can not be connected to multiple buckets simultaneously. This brings the discussion to the odd concept of breaking up the RDD into a list of RDDs, which is an oxymoron in the Spark world. You will see plenty comments on the web asking you to rethink the problem, and that RDD is already parallelized and meant to work as a single data set. In this use case however, the need to break up into multiple RDDs is completely legitimate.

This bring us to two questions: How to break up an RDD, and once you have a list of RDDs, how to save them (or otherwise process them) in a parallel manner.

Breaking up the RDD

The first option that comes to mind may be to filter the RDD N times, each time filter in the subset with a designated region_id. This would slow especially for high number of region_ids and a large dataset. The parallel equivalent would be leveraging the mapPartitionsWithIndex method.

First, we re-partition the initial RDD by its region_id field, taking care of making sure every partition contains a single region_id. For this, you will need to write a simple Partitioner, call it SingleRegionPartitioner

class SingleRegionPartitioner(partitions: Int, regionIds : List[String]) extends Partitioner {
 def numPartitions: Int = partitions
 def regionIdList: List[String] = regioIds

 def getPartition(key: Any): Int = key match {
  case null => 0
  case _ => regionIdList.indexOf(key)
 }

 override def equals(other: Any): Boolean = other match {
  case c: SingleRegionPartitioner =>
  c.numPartitions == numPartitions
   case _ =>
   false
 }

 override def hashCode: Int = numPartitions
}

val regions = rdd.map(_._1).distinct().collect
println("number of regions = " + regions.length)
val regionHash = new SingleRegionPartitioner(regions.length, regions.toList)
val partitionedRdd = rdd.partitionBy(regionHash)


At this point, each partition contains only data for one region_id. Now we can break up the RDD using the following technique. Create a class which accepts a region_id in its constructor, and provides a method for returning a single Iterator with matching region_id.

class RddFromPartition(i: Int) extends Serializable {

 def pickOnePartition(index: Int, iter: Iterator[(String, String)]): Iterator[(String, String)] = {
  if (index == i) {
   iter
  } else {
   Iterator()    // empty iterator
  }
 }
}

Next we call mapPartitionsWithIndex, feeding one region_id at a time. Since the lamda function only returns the iterator without looping through its elements, this call is completed very rapidly. If region_id has extremely high cardinality, the loop can even be replaced using scala multi-threading.

val rdds = ArrayBuffer[RDD[(String, String)]]()

for (i <- 0 to regions.length - 1) {
 val oneRdd = partitionedRdd.mapPartitionsWithIndex(new   RddFromPartition(i).pickOnePartition, false)
 println("oneRdd size = " + oneRdd.count)
 rdds.append(oneRdd)
}


Process the RDD list in parallel

Now that we have a list of RDDs, we want to call rdd.saveToHadoopFile on each, and feed the corresponding bucket name as path prefix. This can be achieved by calling a loop but augmented with scala Future.

import scala.concurrent._
import ExecutionContext.Implicits.global

rdds.map { regionRdd =>
val rddCount = regionRdd.count
val region = regionRdd.take(1)(0)._1
val regionBucket = pathMap.get(region).get

Future(regionRdd .saveAsHadoopFile(regionBucket, classOf[String],
  classOf[String], classOf[SimpleMultipleTextOutputFormat])
 )
}


Here, pathMap is a predefined map for matching S3 path to region_id. The Future wrapper allows the saveAsHadoop call to return immediately. If it is of interest to know when all files have been saved, the code can collect all the futures and wait for the completion event.

As a final word, S3 connections are not the most reliable in the world. It is likely you would want to put do the save inside a retry wrapper. Here the saveByKey is a function to be passed as an argument into the retry wrapper.


def saveByKey(path : String, keyedRdd : RDD[(String, String)]) = {
 keyedRdd.saveAsHadoopFile(path, classOf[String], classOf[String],
  classOf[SimpleMultipleTextOutputFormat])
}

def retry(callback: => Unit, maxTries : Int = 3) {
 var count = 0

 breakable{
  while(count < maxTries){
   try{
    callback
    break
   }catch{
    case ex: Exception => println(ex.getMessage)
     count += 1
    if(count == maxTries){
     throw ex
    }
    Thread.sleep(1000)
   }
  }
 }
}


Then change the future call to

Future(retry(saveByKey(regionRdd, regionBucket)))

Summary

This article addresses the need to write RDD data to multiple S3 buckets based on a particular attribute in the content. It presents novel techniques to efficiently break the original RDD into a list of RDDs, and then process the save method in parallel.