Fraud is a fact of life for the financial industry. Paypal
did not become one of the only dotcom survivors by remaining a pure supplier of
transaction engines. While final fraud determination is still in the hands of
human experts, there has been much interest in automated processes that can syphon
out suspicious activities for further scrutiny.
Given the level of global credit card transactions, such a problem falls
squarely in the domain of big data. More than just handling the data volume,
financial institutes also faces a technical challenge in being able to catch
fraudulent transactions as they happen. All this points to the need for a real
time streaming analysis capability.
Open source big data technologies have been advancing leaps
and bounds recently, with the most recent push towards in-memory and streaming
computation. This project creates a prototype implementation of a Markov chain
based sequence mining algorithm, which combines in-coming credit card
transactions with most recent transaction history and makes real time
prediction based on the likelihood of the buying pattern. It is inspired by two
excellent prior open source projects 1) Implementing
a real-time data pipeline with Spark Streaming, which demonstrated the seamless
coupling between Spark and Kafka for developing streaming applications, and 2) Real
Time Fraud Detection with Sequence Mining,
which presented a novel method for outlier detection which is in turn based on Markov
Chains, Classifiers and Intrusion Detection. No
attempt has been made to improve on the author’s original algorithm. This
project is focused solely on an alternative system design, which leverages
newer technological options while completing a realistic streaming environment beyond
a batch simulation.
The code in the project can be found at
System overview
The scenario
A person’s credit card charge events are bound to spread
sporadically over time. In order to capture someone’s spending incidents for
the purpose of outlier detection, individual events need to be collected as
they occur and prepended with historical spending incidents. This necessitates
the use of both a real time stream capture device and an archive of previous events.
Furthermore, a system capable of handling very large data throughput and low
latency requires the fastest streaming computing device possible. The author’s
choice for computing this prediction algorithm is Apache Spark, which although
only recently appeared on the big data scene, is already making this fast
moving industry taking an enthusiastic notice.
Spark couples very well with Kafka for data streaming, and with Hbase
for persistence. Hbase is a well, established NoSql database known for its high
capacity and high io throughput.
The prediction pipeline
The CCStreamingFeed class generates simulated credit card
charge events and sends them into the kafka topic “real-time-cc”. Our spark
code “MarkovPredictor” picks up the streaming feed, identify the customer for
each event, find matching historical events and assemble the events to create a
sequence of the right size (which is configured in the “app_config.properties”
file. The default size is 5). The assembled sequences are piped into the Kafka
topic “user-trans-history” for monitoring purposes.
The
prediction process matches the input sequence against the transition matrix in
the trained model, which is stored in the Hdfs file “/data/streaming_analysis/markovmodel.txt”. Any sequence deemed
to be suspicious is piped into the third Kafka topic “candidate-fraud-trans”.
For demo purposes, this can be displayed in a Kafka console consumer. In a real
system, this would be channeled into a down stream device, which would raise
the attention of a human fraud investigator.
The training step
During training, we enter a large number of customers into
the data generator (say 500). The trainer spark code “MarkovModelTrainer” picks
up the large data stream, sort the data by customer id and generate 500 set of
credit card spending sequences. Spark code excels in grouping and aggregation.
After the total number of occurrences for each stat-to-state transition has
been tallied and normalized, these numbers are stored in Hdfs as the prediction
model.
Each of the system components are introduced next, followed
by rehashing of the algorithms by Pranab.
Implementation overview
In this implementation, the stack includes four major
components: spark, hdfs, hbase and Kafka, all clusterizable open source tools.
Another words, they are built for redundancy and scalability. We start with a
brief introduction to the components. Further
in depth discussions on them can be easily discovered on the web.
Spark
Spark has its academic
origin in the Berkeley AMP Lab and is charting new territory in Apache history
with its unprecedented high commit rate. Any industry watcher has to be well
aware of its rising potential as the juggernaut of big data platform. It
incorporates the omnipotent DAG pipeline structure for generalized data
processing, leverages in-memory data caching for speed, relies on processing
lineage for resiliency against node failure, and opens up development options
to java, scala, python, as well as R programmers. It recently released its yarn
implementation to maximize cluster resource usage. A concurrent project, MLlib,
is incorporated into spark releases, which aims to boost both ease of use and
scalability of machine learning algorithms, and make them accessible to those
of us who are not PhD statisticians.
Spark itself is
implemented in scala. The scala collection and function-programming paradigm is
a perfect match for spark’s concept of RDD (Resilient Distributed Dataset),
which applies successive grouping and reduction steps to a large data set on a
cluster of high memory machines. Spark has build in capability to detect
locality of hdfs files. A typical spark application interacts with a variety of
data sources including hdfs, hbase, cassandra, and kafka, etc.
Once Spark has been started, the user can monitor job submissions from the web page.
Once Spark has been started, the user can monitor job submissions from the web page.
One particularly
interesting feature of spark is its capability for mixing in coming stream with
archived data. Spark streaming uses a micro-batching technique in which it
converts data collected over a pre-defined time window into a RDD to feed into
the processing pipeline. The developer can dictate how rapidly to slide such a
window forward in time for continuous data collection and processing.
However, it should be noted that the way streaming is done
for this project is not based on a timed window, but rather on the sequence
size. While a timed window works wonderfully for a continuous stream, credit
card spending is more sporadic in nature. What this means at a programming
level is that we would be calling RDD.reduceByKey rather than RDD.reduceByKeyAndWindow
One of the easiest ways to feed streaming data is to
leverage the Kafka api. This brings the discussion to our second major
component, Kafka, which is in so many ways a marvel in its own
right.
right.
Kafak
Kafka is a queue, a publish-subscribe middleware, a
streaming source, a centralized logging framework, and much more. It comes with
build-in partitioning and replication over clustered hardware, command line
interface and a multi-language api. In addition of partitioning, Kafka achieves
its high performance by leveraging the high sequential write speed of modern
disk drives (600MB.sec), OS pagecache (no jvm garbage collection), sendfile
utility, use of simple read and appends to files (O (1) latency), network
packet batching, and compression of batched messages.
The messages sent into Kafka are persisted for configurable
period then discarded. Parallelism through partitioning gives Kafka an
effectively constant performance with respect to data size.
Each message is tracked by a sequential id called offset,
which is kept on a per consumer basis. Consumers can be labeled into a group.
Within each group, only one consumer gets to consume a particular message.
However, multiple unlabeled consumers will each receive one copy of each
message independently. This consumer group concept therefore allows Kafka to
fulfill the two traditional models of queuing (all consumer in one group) and
publish-subscribe (no group affiliation) and any combination in between.
The producer on the other hand works with a cluster by picking
which partition to write to, either in a round robin fashion, or based on some
semantic rule.
Kafka has been applied as a high throughput replacement for
messaging brokers (RabbitMQ), web activity tracking, metric or log aggregation,
event sourcing, or a commit log. For the purpose of this article, the key
advantage is its close integration with spark in the form of the KafkaUtils
Class. More on this later.
Hbase
Hbase is a column family database, which most commonly uses
hdfs as its persistence layer. It is linear scalable in performance and storage
capacity making it an obvious candidate in big data use cases. The data set is partitioned
across multiple region servers based on row keys.
Hbase and NoSql database in general cannot be understood in
terms of the usual SQL database abstractions. Its high read / write efficiency
is a direct result of leveraging contiguous file access (vis-Ã -vis kafka). Whether designing the schema or using hbase during
operations, it helps to visualize how the data exists as files.
The hbase data cell has a simple format known as keyvalue,
shown below.
row key:column
family: column qualifier:timestamp:value
Data is stored in the file cell by cell, ordered by the byte
value of each segment. During a query, the user can specify the selection
criterion using any of the keyvalue segments. By using more up front segments,
the query enjoys a direct performance gain. Therefore, queries based on values are
the least efficient.
This diagram illustrates that each column family is stored
in its own file to maximize read and write speed of a single column family. It
also illustrates the technique of shifting cell values up in position for
faster query response.
The most efficient way to retrieve hbase data is though a
scan, in which start and stop rows are specified. The pre-existing ordering of
the row key means a scan is simply translates to an efficient contiguous file
read. The freedom to design the row key
as arbitrary byte data gives the designer tremendous leeway for creative
designs. Composite key design involving lumping together different application
concerns into the row key makes possible the use of partial key scan. The user
can narrow in on a data set by specifying a range of any one part of the
composite key.
Hdfs
Hdfs is a file system built to run hadoop map reduce jobs.
It redundantly stores large block (typically gigabytes) file content across
hadoop data nodes according to a replication factor. A name node stores the
directory meta data for the files. A
secondary name node takes snap shots of the file directory in time to allow
quicker recovery in the event of a primary name node crash.
A large cluster houses the hadoop nodes on multiple racks. A
good block placement policy takes into account the difference in bandwidth for
intra and inter-rack traffic, and provides
a tradeoff between minimizing the write cost, and maximizing data reliability,
availability and aggregate read bandwidth.
The hadoop job tracker takes advantage of data locality by
assigning task to run on data nodes which host the data required to each
task. The same data awareness is built
into other computing platform such as spark.
Rehash of the sequence mining algorithm
The author characterizes each credit card transaction by its
three prominent features:
1. Amount spent: Low, Normal
and High
2. High vs normal price ticket
item: Normal and High
3. Time elapsed since the last
transaction: Large, Normal and Small
This feature vector combined with the customer id and
transaction id yields a stream of in coming data in the form of
G5XZW85212,JXCT97WT1AKR,LNL
2Q15KA83F3,TJE3MI8YIDA5,NNN
3H4E0847GL,7AK95UVB1MQ4,NHS
D20ID059R4,1Q8NWNUG33IM,HNL
The feature vector is of dimension 18 (3 x 2 x 3). You can
therefore construct a 18 x 18 state matrix and collect statistics on the
likelihood of any state transition between two successive credit card
transactions. The training step of the
model involves collecting a relatively large number of user transactions and
performing a simple aggregation on each state transition. Once a trained model is prepared, each new
transaction sequence can be judged to be either within the pattern or anomalous
based on how well it fits the model over all.
In the prediction stage, the system operators can define the
size of the transaction sequence to be evaluated and choose a threshold for the
determination of outliers. For example, for a window size of 5 transactions, a
threshold may be set such that at least 3 highly unlikely transitions is
required to trigger the detection.
Code Review
There are a number of interesting code patterns which
deserves special mentioning. First notice that this project is a mix of scala
and java code. Any time SparkContext is involved, the author opted to code in
scala, which is the native language for spark. Scala code embeds Jdk classes
seamlessly, so there are no particular concerns arising from this mix.
RDD
Spark works its magic using its RDD (Resilient Distributed
Dataset). The standard spark process
starts by converting ordinary data into an RDD. An example can be found in
TrainingApp.
val dataRdd
= sparkContext.parallelize(dataFeed, 2)
From this point, all the Spark methods can be applied one
step at a time down the processing pipeline.
val xactionByCustomer = dataRdd.map
{
line =>
{
val parts = line.split(",")
(parts(0), parts(2)) // drop
transaction id
}
}.reduceByKey((previous, current) =>
previous + "," + current)
.map {
tuple =>
val line = tuple._1 + "," +
tuple._2
line
}
In this pipeline,
the input RDD is first split into three parts, followed by discarding of the
unused second part (transaction id). Next, a reduction by key step concatenates
all the transaction tokens into one long sequence, delimited by commas.
Streaming RDD
In case of Streaming, it is called DStream. As mentioned
before, Spark has a very convenient interface to kafka, which converts kafka
data into a DStream. It is called KafkaUtils.
val
messages = KafkaUtils.createStream[String, SingleTransaction, StringDecoder,
SingleTransactionDecoder](streamingContext, kafkaParams, topics,
StorageLevel.MEMORY_ONLY)
The first argument, streamingContext, contains all crucial
parameters needed to define a streaming RDD, including the micro-batching
window size (in seconds) and sliding rate.
The incoming data feed is automatically deserialized into a
custom class called SingleTransaction. In the following statement, the spark
code creates a (key, tokens) tuple, from which, additional reduction steps
assembles the per customer transaction sequences, which can be scored by the
chosen scorer.
val
xactionByCustomer = messages.map(_._2).map {
transaction =>
val key = transaction.customerId
var tokens = transaction.tokens
(key, tokens)
}
RDD Aggregation
During the model training process, we want to add up the
occurrence of every state-to-state transition. This can be done by a four step
process.
val transitionMatrix = sequences.map
{
line =>
val userId = line._1
val records = line._2
// each line goes into a loop
// every pair transforms into key,
value pair
// transition key = tuple (sourceIndex,
destinationIndex)
// value = 1
var stateCount = ArrayBuffer[String]()
val numberRecords = records.length
for (i <- 0 to numberRecords - 2) {
val statesV = statesVar.value;
val srcIndex =
statesV.indexOf(records(i))
val destIndex =
statesV.indexOf(records(i + 1))
val key = srcIndex + "-" +
destIndex
stateCount += key
}
stateCount.toList
}
.flatMap(c => c)
.map(c => (c, 1))
// reduce by key, add values
.reduceByKey(_ + _)
In step one, the sequence of tokens is separated from the
customer id. Each pair of consecutive tokens is lumped into a tuple to be used
as a key later. The RDD coming out of the first step is a list of these tuples.
The flatmap step breaks each list into single tuples, to be followed by the
next step, which appends the number 1 behind the tuple. Lastly, the reduceByKey
step adds all the 1’s for each tuple, resulting in the cumulative count of each
tuple. This of course then represents the transition probability between any
two states.
Hbase interface
When it comes to interfacing Spark to Hbase, we have a
concern about the economy of database connections. In a spark job, a large
dataset can potentially be composed of millions of individual tuples. If each
tuple triggers a new Hbase connection, the connection opening step alone would
overwhelm the system. Here we can take the advantage of the
RDD.foreachParition() method. This way, the number of connections is reduced to
just the number of work nodes in the spark cluster.
tuple.foreachPartition {
iterator =>
// read prev windowSize -1
transactions from hbase and prepend to new token list
val conf =
HBaseConfiguration.create();
val table = new HTable(conf,
"cchistory");
iterator.foreach {
tuple =>
val key = tuple._1
var tokens = tuple._2
val get = new
Get(Bytes.toBytes(key));
get.addFamily(Bytes.toBytes("transactions"));
val result = table.get(get);
Once the Hbase
connection is opened, the code goes through each transaction sequence, pick up
the customer id, and look for any historical sequence for the same customer in
the cchistory table.
Scorer subclassing
There are three scorers proposed by Pranab. Here we
implement them as subclasses of the same trait “Scorer”. The factory pattern is
implemented using the apply() method of the scala object by the same name. The apply() method is automatically called
during the constructor call in the OutlierDetector class.
val scorer =
Scorer(algorithm, stateTranstionProb, states, stateSeqWindowSize)
def apply(s: String, stateTranstionProb:
Array[Array[Double]], states: Array[String], stateSeqWindowSize: Int): Scorer =
{
if (s == "MissProbability") {
return new
MissProbability(stateTranstionProb, states, stateSeqWindowSize)
} else if (s == "MissRate") {
} else {
return new
entropyReduction(stateTranstionProb, states, stateSeqWindowSize)
}
}
Summary
There is a healthy collection of clusterizable tools in the
big data eco system. At the center of the collection is a high performance,
programmable platform that allows a competent coder to create custom solutions,
for which distributed execution is bundled in for free. Spark is an in-memory, general-purpose data
processing pipeline, which offers multi-language API, and ready integration
with many supporting infrastructure layer tools.
This open source project demonstrates how a novel fraud
detection algorithm involving real time streaming data can be implemented using
these tools.
About Bruce Ho
Bruce Ho is a big data enthusiast with a special interest in
in-memory and streaming computing. He devoted the past year to implementation
of predictive modeling and machine learning using Spark and NoSql technology.
He is currently creating a data pipeline, which delivers real time user profile
analysis for optimization of Ad campaigns. Look out for more of Bruce’s up
coming open source projects. He was formerly a MIT/Caltech trained physicist,
who later picked up java architecture, and big data. Bruce worked in the software industry for over
10 years in various companies including Life Technologies, TeraData, and
Amazon.
Great post! Analyzing big data allows analysts, researchers, and business users to make better and faster decisions using data that was previously inaccessible or unusable. Using advanced analytics techniques such as text analytics, machine learning, predictive analytics, data mining, statistics, and natural language processing, businesses can analyze previously untapped data sources independent or together with their existing enterprise data to gain new insights resulting in significantly better and faster decisions.
ReplyDeleteThanks
Big Data analytics
Can you please help me to run this program ?
ReplyDeleteThanks!
Please post your questions, so that everybody could benefit.
ReplyDeleteHi,
ReplyDeleteI am getting below error while submitting the Trainer App.
Exception in thread "main" java.io.IOException: Mkdirs failed to create /data/streaming_analysis (exists=false, cwd=file:/home/ubuntu/spark-1.6.2-bin-hadoop2.6)
Totally there will be three files created
1./data/streaming_analysis/markovmodel.txt
2./data/streaming_analysis/training_transaction
3./data/streaming_analysis/training_sequence
I can the 2nd and 3rd file has been generated in hdfs without any error.First file is not getting created.Please have a look into it.
Thanks!
Hi Thanks for the post ,
ReplyDeleteI am trying my best to understand the complete stuff.
Till Now I am clear about the 3 file generation in HDFS by TrainerApp.scala.But I am not able to understand the logic of the probalities generated in "markovmodel.txt" file . I tried to put logs in my setup but in MarkovModelTrainer.generateModel() not able to the logic of normalizationV(srcDest(0))
val prob = x._2.toDouble / normalizationV(srcDest(0)) line ?
and i see the prob values in HDFS properly ... but wanted to know the logic of this prob generation ? please some one can tell me .
And also the logic of Prediction ?
Thanks
Suman