Analyzing Kafka data streams with Spark
This blog describes a Spark Streaming application which consumes event data from a Kafka topic to provide continuous, near real-time processing and analysis of the event data stream. The demonstration application, written in Java 8 and runnable on a local host, uses a Spark direct connection to Kafka, and consumes the 911 calls as they are published to the topic. This example can be adapted to create applications capable of providing fast analysis and alerting of conditions of interest contained within a data stream.
The general use case for this application is the monitoring and alerting of a data stream of events. Specifically, the example data is a set of 911 calls in the Seattle area, occurring over a number of days (the data is from Data.gov) These calls are provided on a Kafka topic as csv delimited records. Each includes a call type (e.g. a simple categorization, such as ‘Fire’ or ‘Aid’) a timestamp, and a geospatial location. The application performs batch set and stateful analysis, and outputs summaries of the current batch set and state to a logger.
The Apache Spark project is an open source technology providing low latency, memory based, highly available, large scale analytical data processing. Spark’s performance capability and its growing base of software is driving its adoption into a variety of application areas. Spark documentation provides examples in Scala (the language Spark is written in), Java and Python. Spark also provides an API for the R language. This blog is written based on the Java API of Spark 2.0.0.
Apache Kafka is a widely adopted, scalable, durable, high performance distributed streaming platform. This example uses Kafka version 0.10.0.1.
Building on top of the Spark core functionality, specialized libraries provide for processing in different modes:
DStream / RDD
Spark streaming uses an abstraction of streaming data known as a DStream (for discretized stream), which is based on the concept of a resilient distributed dataset (RDD); a fault-tolerant collection of elements which can be operated on in parallel. A DStream represents a sequence of RDDs organized on a periodic time series. Each RDD in the sequence can be considered a “micro batch” of input data, therefore Spark Streaming performs batch processing on a continuous basis.
DStreams can provide an abstraction of many actual data streams, among them Kafka topics, Apache Flume, Twitter feeds, socket connections, and others. This blog describes Spark consuming a Kafka topic. Being based on RDDs, DStreams offer much of the same processing as RDDs, but also provide time based processing, such as analytics that utilize sliding time windows.
Spark Direct Streaming
Spark can process Kafka using Receivers, but Spark also includes a Kafka Direct API (available for the Spark Java and Scala APIs since Spark 1.3, and for the Python API since Spark 1.4). The direct API does not use Receivers, and instead is a direct consumer client of Kafka. The direct approach ensures Exactly Once processing of the Kafka data stream messages. Full end to end Exactly Once processing can be achieved provided that Spark’s output processing is implemented as exactly-once. For an output source, this requires an Idempotent way to store the data, or it requires that data is stored using atomic transactions. The example application described below uses the Direct API.
The application code described here is the code found in the com.objectpartners.spark.rt911.streaming package, and supporting packages in com.objectpartners.spark.rt911.common.
The demonstration requires Zookeeper, Kafka and a Kafka producer client running. Spark processing is launched by the Main Application class, which starts Spark via a SparkKafkaRunner class. Spark then runs continuously, consuming and processing a Kafka topic stream and waiting for termination.
The processStream method in this class does the actual Spark processing work, starting with creation of the Spark streaming context:
// create a local Spark Context with two working threads SparkConf streamingConf = new SparkConf().setMaster("local").setAppName("911Calls"); streamingConf.set("spark.streaming.stopGracefullyOnShutdown", "true"); // create a Spark Java streaming context, with stream batch interval JavaStreamingContext jssc = new JavaStreamingContext(streamingConf, Durations.milliseconds(streamBatchInterval));
The second argument to the JavaStreamingContext controls the micro batch time interval, e.g. if streamBatchInterval has a value of 1000L, then the data is presented as a series of RDDs that arrive every second.
To perform computation on state, it is necessary to have checkpointing enabled in the application. This is because the computation will use the previous state values. For demo purposes, a local temp directory works.
// set checkpoint for demo jssc.checkpoint(System.getProperty("java.io.tmpdir"));
The direct connection to the Kafka topic is established with the following code:
// create a Discretized Stream of Java String objects // as a direct stream from kafka (zookeeper is not an intermediate) JavaPairInputDStream rawDataLines = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet );
The String.class and StringDecoder.class are specified for both the key and value of the input, telling Spark that the Kafka message is of type <String, String> and the key and value can be decoded with the StringDecoder.
The KafkaParams and the topicsSet specify the Kafka brokers and the topics to subscribe to:
// set up receive data stream from kafka final Set topicsSet = new HashSet(Arrays.asList(topic)); final Map kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", broker);
For the demo, broker and topic are defined in the Spring configuration class.
private String topic = "demo-topic"; private String broker = "localhost:9092";
These could be read from a properties file, or some other external source in a production version.
With the direct Kafka connection, the Spark application takes in data from the Kafka topic in the form of a Java DStream parametrized as a String. This represents a DStream of Strings, each of which are a line of csv data taken from the value (Tuple2::_2) of the Kafka message <K, V> pair.
An example csv line:
415 Cedar St,Aid Response,09/06/2013 05:44:00 AM +0000,47.618123,-122.347615,"(47.618123, -122.347615)",F130090107
These csv lines are then converted to Java domain objects representing 911 calls using the following Java Class:
which is called by
// transform to RealTime911 objects JavaDStream callData = lines.map(map911Call);
Now we have a DStream of Java domain objects to analyze with Spark. First some bad data filtering is performed, then, if ‘filterFire’ is set to true, the input is filtered to those calls that contain the text ‘fire’.
Micro Batch RDD computation
The DStream “pairs” are the Java 911 call objects keyed by call type. These are then reduced to lists of call objects keyed by the call type, and returned as the “reduced” DStream. When written out, this DStream shows a summary of the current micro batch organized by call type:
4RED 2 + 1 + 1 calls: 1 4RED 2 + 1 + 1 06/12/2015 11:55:00 PM +0000 3021 Sw Bradford St (-122.3714, 47.5692) Aid Response calls: 1 Aid Response 06/12/2015 11:52:00 PM +0000 2211 Alaskan Way (-122.3477, 47.6108) Medic Response calls: 2 Medic Response 06/12/2015 11:58:00 PM +0000 2717 Dexter Av N (-122.3469, 47.6444) Medic Response 06/13/2015 12:06:00 AM +0000 120 6th Av S (-122.3264, 47.6015)
Shown below is the mapping to unreduced pairs, and a mapping to pairs reduced by call type. The unreduced pairs are separated out to support stateful call type count calculations (see State Computation below).
The log messages are created by iterating over the RDDs in the “reduced” DStream:
To output a summary of the micro batch, DStream’s most generic output operator, foreachRDD, is provided a closure that performs formatting and logging of the micro batch RDD. This executes in Spark’s driver process, and could also be used to save RDDs to a database, or send RDDs to external consumers.
For the current running total of each call type, the reducedState DStream is computed and used to show a summary of call types received so far:
--------------------------------------------------------------- State Total: --------------------------------------------------------------- 4RED 2 + 1 + 1 total received: 2 Aid Response total received: 32 Aid Response Yellow total received: 1 Auto Fire Alarm total received: 1 Automatic Fire Alarm False total received: 1 Automatic Medical Alarm total received: 1 Investigate Out Of Service total received: 1 MVI Motor Vehicle Incident total received: 2 MVI Freeway total received: 1 Medic Response total received: 10 Medic Response 7 per Rule total received: 2 Trans to AMR total received: 1
The state computation is shown below:
A stateful Spark transformation, updateStateByKey(pairFunction), updates state with a Java lambda expression that takes a List of call counts, representing the previous state, and the current micro batch call counts.
These log messages are output by iterating over the reducedState DStream, similar to the code above.
Getting Code and Running the demo.
To download or clone the project, go to the Github project. The project README includes instructions on building and running the project code.
This blog described Apache Spark directly consuming a Kafka topic, and performing analysis using Spark’s DStream abstraction. Spark streaming offers choices for processing and analyzing data stream. In addition to Scala and Java, Python, and R language APIs are available. Furthermore, other Spark libraries can be used with Spark Streaming, including the SQL library to process the stream with SQL constructs, and the machine learning algorithms provided by Spark’s MLlib.