This is Part 1 of this blog series. Additional details are provided in Part 2.
In this blog post, I'll describe an architecture for performing near real-time streaming anomaly detection on IoT data. We'll use Apache Kafka, Apache Spark, Apache Flume, Apache Zeppelin, and EMC Elastic Cloud Storage (ECS) to build a scalable, geo-distributed, and highly reliable system. The diagram below shows the key components of our system.
To be specific by what is meant by "near real-time" in this context, this system will generate alerts within about 5 seconds after an anomalous message is generated by the device.
The Internet of Things
We start with IoT devices such as smart phones, thermostats, cars, and industrial machines. These may be all over the world and will generally send real-time streaming data to a nearby web service accessible over the Internet. To reduce the traffic between continents (or perhaps to comply with privacy laws), we'll have data centers in multiple regions throughout the world. Each data center will host multiple instances of this web service. When the web service receives the message, it may perform some very basic tasks (e.g. authentication and authorization) and then it simply forwards the message to a Kafka topic.
Apache Kafka, in short, is a distributed and reliable message bus. Each data center will have its own Apache Kafka cluster to ensure functionality in the event of a communication failure between data centers. Apache Kafka will replicate the messages among its nodes to ensure that they are protected from hardware or other failures.
Spark Streaming - Real-Time Anomaly Detection
To perform the actual anomaly detection (or many other predictive machine learning tasks), we'll use Spark Streaming. Spark Streaming functions as a complex event processor. Unlike a traditional rules engine, it can be used to perform complex data enrichment and evaluate state-of-the-art machine learning models such as Random Forests and artificial neural networks.
Our Spark Streaming job will do quite a few things.
- First, our Spark Streaming job will pull the messages from our Kafka topic and create mini-batches of messages every few seconds.
- For each mini-batch, our job will enrich the data as needed (e.g. join with other data sets, perform geo-location, calculate fields).
- We will want the enriched data to be persisted to reliable storage. Although Spark can write directly to Hadoop-compatible storage, doing so would result in many small files (one per batch per partition). To avoid this, the Spark Streaming job will send the enriched data to a different Kafka topic where it will be consumed by Flume. Flume will then reliably create very large files, flushing them periodically to ensure the file system has the latest data. We'll use ECS for this purpose because it gives us protection from the failure of a single node as well as the failure of an entire data center.
- The enriched data will then be transformed into features (e.g. convert all categorical data and words into real numbers).
- For each message, the features will be used as the input into our machine learning model and the output will reflect whether the message is predicted to be an anomaly. (The machine learning model will be trained separately. Keep reading.)
- If an anomaly is predicted, then the job will send an alert. This can simply be an alert message sent to a different Kafka topic that an outside alerting system monitors.
EMC Elastic Cloud Storage (ECS)
Now we have our enriched data in multiple files in a single geo-distributed ECS bucket. The primary copy of each object (file) will physically remain within the data center where it originated. Other data centers throughput the world will contain exactly enough parts of the erasure-coded objects so that an object can be rebuilt in the event of a failure of any single data center. Specifically, Reed-Solomon erasure coding is used within a data center and parity (XOR) is used between data centers. This greatly reduces the storage requirement but retains a high degree of data protection. Note that the geo-distributed protection is an asychronous process so it is assumed that a few seconds of data loss is an acceptable trade-off for having a lower write latency.
If you were to browse this ECS bucket (e.g. hadoop fs -ls), you would see files created from the Spark Streaming job running at each data center. A request to an ECS node in one data center to read a file whose primary copy is at another data center will result in a read across your WAN. However, subsequent requests for the same file will often use a locally cached copy of the object eliminating the full WAN read. In any case, however, ECS will guarantee that you are reading the latest version of the file.
These features of ECS make it extremely simple to run high-performance analytics jobs across the entire global data set. You only need to point your job to read the files in a single directory and ECS will take care of the WAN transfer, local caching, cache invalidation, consistency, disaster recovery, and high availability.
Spark - Machine Learning Model Training
Back to anomaly detection, we will use the entire global data set to train our machine learning model to detect anomalies. To do this, we'll create a Spark batch job that executes at a single data center to do the following.
- Read the enriched data that was placed in our ECS bucket by the Flume job at each data center. Since ECS guarantees a consistent view of the data, this Spark job will see the entire data set up to the most recent flush by Flume.
- Train our machine learning model. A single model will be created that uses the data collected by all data centers. This step may also include cross validation and hyperparameter tuning.
- Save our model to files on ECS. Depending on the algorithm and hyperparameters we choose, the size of our model on disk may be a few KB to several GB.
- Inform each of our Spark Streaming jobs to use the newly trained model saved on ECS.
Dashboards and Exploration with Zeppelin
Apache Zeppelin is a web-based notebook that enables interactive data analytics. It can also be used to easily create some very nice dashboards. Below we can see a list of the 1000 most recent alerts, a histogram based on the alerts, and some Kafka metrics from each site.
The dashboard can be configured to refresh itself every minute.
Zeppelin integrates directly with Spark and PySpark and it can be used interactively. The following will load a sample of 1% of the enriched data on ECS and cache it in Spark for fast interactive querying.
I can also show some statistics for each of the numeric columns in the enriched data.
I can also write SQL and view the results in a table or in a variety of charts.
And finally, it's extremely simple to convert any interactive notebook into a dashboard by changing the view to "report."
That's it for the core components of a typical anomaly detection pipeline for global IoT data. You may have noticed that there wasn't very much to deal with to make it "global." There was no need to develop scripts and processes around using DistCp to copy files from one data center to another (and back). We didn't need to worry about caching data locally or the harder problem of cache invalidation. We didn't need build complex disaster recovery / high availability processes, as least as far as the data storage is concerned. ECS handled these tasks for us allowing us to focus on more interesting things.
Although this blog post is focused on anomaly detection, most of this architecture is general purpose and can be used for a wide variety of machine learning and other global Big Data use cases.
If you want to learn more, there are a lot more details in Part 2 of this blog series.
See Part 3 of this blog series.