Wednesday, December 17, 2014

A real time streaming implementation of markov chain based fraud detection


Fraud is a fact of life for the financial industry. Paypal did not become one of the only dotcom survivors by remaining a pure supplier of transaction engines. While final fraud determination is still in the hands of human experts, there has been much interest in automated processes that can syphon out suspicious activities for further scrutiny.  Given the level of global credit card transactions, such a problem falls squarely in the domain of big data. More than just handling the data volume, financial institutes also faces a technical challenge in being able to catch fraudulent transactions as they happen. All this points to the need for a real time streaming analysis capability.

Open source big data technologies have been advancing leaps and bounds recently, with the most recent push towards in-memory and streaming computation. This project creates a prototype implementation of a Markov chain based sequence mining algorithm, which combines in-coming credit card transactions with most recent transaction history and makes real time prediction based on the likelihood of the buying pattern. It is inspired by two excellent prior open source projects 1) Implementing a real-time data pipeline with Spark Streaming, which demonstrated the seamless coupling between Spark and Kafka for developing streaming applications, and 2) Real Time Fraud Detection with Sequence Mining, which presented a novel method for outlier detection which is in turn based on Markov Chains, Classifiers and Intrusion Detection No attempt has been made to improve on the author’s original algorithm. This project is focused solely on an alternative system design, which leverages newer technological options while completing a realistic streaming environment beyond a batch simulation.

The code in the project can be found at

System overview

The scenario


A person’s credit card charge events are bound to spread sporadically over time. In order to capture someone’s spending incidents for the purpose of outlier detection, individual events need to be collected as they occur and prepended with historical spending incidents. This necessitates the use of both a real time stream capture device and an archive of previous events. Furthermore, a system capable of handling very large data throughput and low latency requires the fastest streaming computing device possible. The author’s choice for computing this prediction algorithm is Apache Spark, which although only recently appeared on the big data scene, is already making this fast moving industry taking an enthusiastic notice.   Spark couples very well with Kafka for data streaming, and with Hbase for persistence. Hbase is a well, established NoSql database known for its high capacity and high io throughput.

The prediction pipeline


The CCStreamingFeed class generates simulated credit card charge events and sends them into the kafka topic “real-time-cc”. Our spark code “MarkovPredictor” picks up the streaming feed, identify the customer for each event, find matching historical events and assemble the events to create a sequence of the right size (which is configured in the “app_config.properties” file. The default size is 5). The assembled sequences are piped into the Kafka topic “user-trans-history” for monitoring purposes.

The prediction process matches the input sequence against the transition matrix in the trained model, which is stored in the Hdfs file “/data/streaming_analysis/markovmodel.txt”.  Any sequence deemed to be suspicious is piped into the third Kafka topic “candidate-fraud-trans”. For demo purposes, this can be displayed in a Kafka console consumer. In a real system, this would be channeled into a down stream device, which would raise the attention of a human fraud investigator.




The training step


During training, we enter a large number of customers into the data generator (say 500). The trainer spark code “MarkovModelTrainer” picks up the large data stream, sort the data by customer id and generate 500 set of credit card spending sequences. Spark code excels in grouping and aggregation. After the total number of occurrences for each stat-to-state transition has been tallied and normalized, these numbers are stored in Hdfs as the prediction model.


Each of the system components are introduced next, followed by rehashing of the algorithms by Pranab.

Implementation overview

In this implementation, the stack includes four major components: spark, hdfs, hbase and Kafka, all clusterizable open source tools. Another words, they are built for redundancy and scalability. We start with a brief introduction to the components.  Further in depth discussions on them can be easily discovered on the web.

Spark


Spark has its academic origin in the Berkeley AMP Lab and is charting new territory in Apache history with its unprecedented high commit rate. Any industry watcher has to be well aware of its rising potential as the juggernaut of big data platform. It incorporates the omnipotent DAG pipeline structure for generalized data processing, leverages in-memory data caching for speed, relies on processing lineage for resiliency against node failure, and opens up development options to java, scala, python, as well as R programmers. It recently released its yarn implementation to maximize cluster resource usage. A concurrent project, MLlib, is incorporated into spark releases, which aims to boost both ease of use and scalability of machine learning algorithms, and make them accessible to those of us who are not PhD statisticians.


Spark itself is implemented in scala. The scala collection and function-programming paradigm is a perfect match for spark’s concept of RDD (Resilient Distributed Dataset), which applies successive grouping and reduction steps to a large data set on a cluster of high memory machines. Spark has build in capability to detect locality of hdfs files. A typical spark application interacts with a variety of data sources including hdfs, hbase, cassandra, and kafka, etc.

Once Spark has been started, the user can monitor job submissions from the web page.


One particularly interesting feature of spark is its capability for mixing in coming stream with archived data. Spark streaming uses a micro-batching technique in which it converts data collected over a pre-defined time window into a RDD to feed into the processing pipeline. The developer can dictate how rapidly to slide such a window forward in time for continuous data collection and processing.



However, it should be noted that the way streaming is done for this project is not based on a timed window, but rather on the sequence size. While a timed window works wonderfully for a continuous stream, credit card spending is more sporadic in nature. What this means at a programming level is that we would be calling RDD.reduceByKey rather than RDD.reduceByKeyAndWindow

One of the easiest ways to feed streaming data is to leverage the Kafka api. This brings the discussion to our second major component, Kafka, which is in so many ways a marvel in its own
right.

Kafak


Kafka is a queue, a publish-subscribe middleware, a streaming source, a centralized logging framework, and much more. It comes with build-in partitioning and replication over clustered hardware, command line interface and a multi-language api. In addition of partitioning, Kafka achieves its high performance by leveraging the high sequential write speed of modern disk drives (600MB.sec), OS pagecache (no jvm garbage collection), sendfile utility, use of simple read and appends to files (O (1) latency), network packet batching, and compression of batched messages.


The messages sent into Kafka are persisted for configurable period then discarded. Parallelism through partitioning gives Kafka an effectively constant performance with respect to data size.

Each message is tracked by a sequential id called offset, which is kept on a per consumer basis. Consumers can be labeled into a group. Within each group, only one consumer gets to consume a particular message. However, multiple unlabeled consumers will each receive one copy of each message independently. This consumer group concept therefore allows Kafka to fulfill the two traditional models of queuing (all consumer in one group) and publish-subscribe (no group affiliation) and any combination in between.


The producer on the other hand works with a cluster by picking which partition to write to, either in a round robin fashion, or based on some semantic rule.

Kafka has been applied as a high throughput replacement for messaging brokers (RabbitMQ), web activity tracking, metric or log aggregation, event sourcing, or a commit log. For the purpose of this article, the key advantage is its close integration with spark in the form of the KafkaUtils Class. More on this later.

Hbase


Hbase is a column family database, which most commonly uses hdfs as its persistence layer. It is linear scalable in performance and storage capacity making it an obvious candidate in big data use cases. The data set is partitioned across multiple region servers based on row keys.

Hbase and NoSql database in general cannot be understood in terms of the usual SQL database abstractions. Its high read / write efficiency is a direct result of leveraging contiguous file access (vis-à-vis kafka).  Whether designing the schema or using hbase during operations, it helps to visualize how the data exists as files.

The hbase data cell has a simple format known as keyvalue, shown below.

row key:column family: column qualifier:timestamp:value

Data is stored in the file cell by cell, ordered by the byte value of each segment. During a query, the user can specify the selection criterion using any of the keyvalue segments. By using more up front segments, the query enjoys a direct performance gain. Therefore, queries based on values are the least efficient.


This diagram illustrates that each column family is stored in its own file to maximize read and write speed of a single column family. It also illustrates the technique of shifting cell values up in position for faster query response.

The most efficient way to retrieve hbase data is though a scan, in which start and stop rows are specified. The pre-existing ordering of the row key means a scan is simply translates to an efficient contiguous file read.  The freedom to design the row key as arbitrary byte data gives the designer tremendous leeway for creative designs. Composite key design involving lumping together different application concerns into the row key makes possible the use of partial key scan. The user can narrow in on a data set by specifying a range of any one part of the composite key.

Hdfs


Hdfs is a file system built to run hadoop map reduce jobs. It redundantly stores large block (typically gigabytes) file content across hadoop data nodes according to a replication factor. A name node stores the directory meta data for the files.  A secondary name node takes snap shots of the file directory in time to allow quicker recovery in the event of a primary name node crash.

A large cluster houses the hadoop nodes on multiple racks. A good block placement policy takes into account the difference in bandwidth for intra and inter-rack traffic, and provides a tradeoff between minimizing the write cost, and maximizing data reliability, availability and aggregate read bandwidth.

The hadoop job tracker takes advantage of data locality by assigning task to run on data nodes which host the data required to each task.  The same data awareness is built into other computing platform such as spark.

Rehash of the sequence mining algorithm

The author characterizes each credit card transaction by its three prominent features:
1.    Amount spent: Low, Normal and High
2.   High vs normal price ticket item: Normal and High
3.   Time elapsed since the last transaction: Large, Normal and Small

This feature vector combined with the customer id and transaction id yields a stream of in coming data in the form of

G5XZW85212,JXCT97WT1AKR,LNL
2Q15KA83F3,TJE3MI8YIDA5,NNN
3H4E0847GL,7AK95UVB1MQ4,NHS
D20ID059R4,1Q8NWNUG33IM,HNL
The feature vector is of dimension 18 (3 x 2 x 3). You can therefore construct a 18 x 18 state matrix and collect statistics on the likelihood of any state transition between two successive credit card transactions.  The training step of the model involves collecting a relatively large number of user transactions and performing a simple aggregation on each state transition.  Once a trained model is prepared, each new transaction sequence can be judged to be either within the pattern or anomalous based on how well it fits the model over all.

In the prediction stage, the system operators can define the size of the transaction sequence to be evaluated and choose a threshold for the determination of outliers. For example, for a window size of 5 transactions, a threshold may be set such that at least 3 highly unlikely transitions is required to trigger the detection.

Code Review


There are a number of interesting code patterns which deserves special mentioning. First notice that this project is a mix of scala and java code. Any time SparkContext is involved, the author opted to code in scala, which is the native language for spark. Scala code embeds Jdk classes seamlessly, so there are no particular concerns arising from this mix.

RDD


Spark works its magic using its RDD (Resilient Distributed Dataset).  The standard spark process starts by converting ordinary data into an RDD. An example can be found in TrainingApp.

val dataRdd = sparkContext.parallelize(dataFeed, 2)

From this point, all the Spark methods can be applied one step at a time down the processing pipeline.

  val xactionByCustomer = dataRdd.map {
    line =>
      {
        val parts = line.split(",")
        (parts(0), parts(2)) // drop transaction id
      }
  }.reduceByKey((previous, current) => previous + "," + current)
  .map {
    tuple =>
      val line = tuple._1 + "," + tuple._2
      line
  }

In this pipeline, the input RDD is first split into three parts, followed by discarding of the unused second part (transaction id). Next, a reduction by key step concatenates all the transaction tokens into one long sequence, delimited by commas.

Streaming  RDD


In case of Streaming, it is called DStream. As mentioned before, Spark has a very convenient interface to kafka, which converts kafka data into a DStream. It is called KafkaUtils.

val messages = KafkaUtils.createStream[String, SingleTransaction, StringDecoder, SingleTransactionDecoder](streamingContext, kafkaParams, topics, StorageLevel.MEMORY_ONLY)

The first argument, streamingContext, contains all crucial parameters needed to define a streaming RDD, including the micro-batching window size (in seconds) and sliding rate.

The incoming data feed is automatically deserialized into a custom class called SingleTransaction. In the following statement, the spark code creates a (key, tokens) tuple, from which, additional reduction steps assembles the per customer transaction sequences, which can be scored by the chosen scorer.

val xactionByCustomer = messages.map(_._2).map {
    transaction =>
      val key = transaction.customerId
      var tokens = transaction.tokens
      (key, tokens)
  }

RDD Aggregation

During the model training process, we want to add up the occurrence of every state-to-state transition. This can be done by a four step process.

   val transitionMatrix = sequences.map {
      line =>
        val userId = line._1
        val records = line._2
        // each line goes into a loop
        // every pair transforms into key, value pair
        // transition key = tuple (sourceIndex, destinationIndex)
        // value = 1

        var stateCount = ArrayBuffer[String]()
        val numberRecords = records.length

        for (i <- 0 to numberRecords - 2) {
          val statesV = statesVar.value;
          val srcIndex = statesV.indexOf(records(i))

          val destIndex = statesV.indexOf(records(i + 1))
          val key = srcIndex + "-" + destIndex

          stateCount += key
        }
        stateCount.toList
    }

      .flatMap(c => c)
      .map(c => (c, 1))
      // reduce by key, add values
      .reduceByKey(_ + _)

In step one, the sequence of tokens is separated from the customer id. Each pair of consecutive tokens is lumped into a tuple to be used as a key later. The RDD coming out of the first step is a list of these tuples. The flatmap step breaks each list into single tuples, to be followed by the next step, which appends the number 1 behind the tuple. Lastly, the reduceByKey step adds all the 1’s for each tuple, resulting in the cumulative count of each tuple. This of course then represents the transition probability between any two states.

Hbase interface


When it comes to interfacing Spark to Hbase, we have a concern about the economy of database connections. In a spark job, a large dataset can potentially be composed of millions of individual tuples. If each tuple triggers a new Hbase connection, the connection opening step alone would overwhelm the system. Here we can take the advantage of the RDD.foreachParition() method. This way, the number of connections is reduced to just the number of work nodes in the spark cluster.

      tuple.foreachPartition {
        iterator =>

          // read prev windowSize -1 transactions from hbase and prepend to new token list
          val conf = HBaseConfiguration.create();
          val table = new HTable(conf, "cchistory");
          iterator.foreach {
            tuple =>
              val key = tuple._1
              var tokens = tuple._2
              val get = new Get(Bytes.toBytes(key));
              get.addFamily(Bytes.toBytes("transactions"));
              val result = table.get(get);

Once the Hbase connection is opened, the code goes through each transaction sequence, pick up the customer id, and look for any historical sequence for the same customer in the cchistory table.

Scorer subclassing


There are three scorers proposed by Pranab. Here we implement them as subclasses of the same trait “Scorer”. The factory pattern is implemented using the apply() method of the scala object by the same name.  The apply() method is automatically called during the constructor call in the OutlierDetector class.

val scorer = Scorer(algorithm, stateTranstionProb, states, stateSeqWindowSize)

The algorithm string in the variable s serves as the discriminator.

  def apply(s: String, stateTranstionProb: Array[Array[Double]], states: Array[String], stateSeqWindowSize: Int): Scorer = {
    if (s == "MissProbability") {
      return new MissProbability(stateTranstionProb, states, stateSeqWindowSize)
    } else if (s == "MissRate") {
      return new MissRate(stateTranstionProb, states, stateSeqWindowSize)
    } else {
      return new entropyReduction(stateTranstionProb, states, stateSeqWindowSize)
    }
  }

Summary


There is a healthy collection of clusterizable tools in the big data eco system. At the center of the collection is a high performance, programmable platform that allows a competent coder to create custom solutions, for which distributed execution is bundled in for free.  Spark is an in-memory, general-purpose data processing pipeline, which offers multi-language API, and ready integration with many supporting infrastructure layer tools.
This open source project demonstrates how a novel fraud detection algorithm involving real time streaming data can be implemented using these tools.

About Bruce Ho 


Bruce Ho is a big data enthusiast with a special interest in in-memory and streaming computing. He devoted the past year to implementation of predictive modeling and machine learning using Spark and NoSql technology. He is currently creating a data pipeline, which delivers real time user profile analysis for optimization of Ad campaigns. Look out for more of Bruce’s up coming open source projects. He was formerly a MIT/Caltech trained physicist, who later picked up java architecture, and big data.  Bruce worked in the software industry for over 10 years in various companies including Life Technologies, TeraData, and Amazon. 

5 comments:

  1. Great post! Analyzing big data allows analysts, researchers, and business users to make better and faster decisions using data that was previously inaccessible or unusable. Using advanced analytics techniques such as text analytics, machine learning, predictive analytics, data mining, statistics, and natural language processing, businesses can analyze previously untapped data sources independent or together with their existing enterprise data to gain new insights resulting in significantly better and faster decisions.

    Thanks
    Big Data analytics

    ReplyDelete
  2. Can you please help me to run this program ?

    Thanks!

    ReplyDelete
  3. Please post your questions, so that everybody could benefit.

    ReplyDelete
  4. Hi,
    I am getting below error while submitting the Trainer App.

    Exception in thread "main" java.io.IOException: Mkdirs failed to create /data/streaming_analysis (exists=false, cwd=file:/home/ubuntu/spark-1.6.2-bin-hadoop2.6)

    Totally there will be three files created
    1./data/streaming_analysis/markovmodel.txt
    2./data/streaming_analysis/training_transaction
    3./data/streaming_analysis/training_sequence

    I can the 2nd and 3rd file has been generated in hdfs without any error.First file is not getting created.Please have a look into it.

    Thanks!

    ReplyDelete
  5. Hi Thanks for the post ,
    I am trying my best to understand the complete stuff.
    Till Now I am clear about the 3 file generation in HDFS by TrainerApp.scala.But I am not able to understand the logic of the probalities generated in "markovmodel.txt" file . I tried to put logs in my setup but in MarkovModelTrainer.generateModel() not able to the logic of normalizationV(srcDest(0))
    val prob = x._2.toDouble / normalizationV(srcDest(0)) line ?
    and i see the prob values in HDFS properly ... but wanted to know the logic of this prob generation ? please some one can tell me .
    And also the logic of Prediction ?
    Thanks
    Suman

    ReplyDelete