Category Archives: Kafka

Kafka Stream Topology Testing

In this blog post, I will explain how to test Kafka stream topologies.

Kafka Stream topologies can be quite complex and it is important for developers to test their code. There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.

Add below dependency in build.sbt

libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "1.1.0" % Test

Below code example is well-known word count application.

import java.lang.Long

import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.{StreamsBuilder, Topology}

import scala.collection.JavaConverters._
class WordCountApplication {
  def countNumberOfWords(inputTopic: String,
                         outputTopic: String, storeName: String): Topology = {
    val builder: StreamsBuilder = new StreamsBuilder()
    val textLines: KStream[String, String] =
    val wordCounts: KTable[String, Long] = textLines
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count([Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
    wordCounts.toStream().to(outputTopic, Produced.`with`(Serdes.String(), Serdes.Long()))

Unit test for above topology.

import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.StreamsConfig
trait TestSpec {
protected val stringDeserializer = new StringDeserializer()
protected val longDeserializer = new LongDeserializer()
val config = new Properties()
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-application")
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.scalatest.{FlatSpec, Matchers}

class WordCountSpec extends FlatSpec with Matchers with TestSpec {

  it should "count number of words" in {
    val wordCountApplication = new WordCountApplication()
    val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
    val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
    val words = "Hello Kafka Streams, All streams lead to Kafka"
    val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
    store.get("hello") shouldBe 1
    store.get("kafka") shouldBe 2
    store.get("streams") shouldBe 2
    store.get("lead") shouldBe 1
    store.get("to") shouldBe 1

Let me explain classes used in testing the topology.

This class makes it easier to write tests to verify the behaviour of topologies created with Topology or StreamsBuilder. You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors, sinks, or sub-topologies.

The best thing about TopologyTestDriver is, it works without a real Kafka broker, so the tests execute very quickly with very little overhead.

Using the TopologyTestDriver in tests is easy:  simply instantiate the driver and provide a Topology and StreamsBuilder#build() and Properties configs, use the driver to supply an input message to the topology, and then use the driver to read and verify any messages output by the topology.

Although the driver doesn’t use a real Kafka broker, it does simulate Kafka Consumer and Producer that read and write raw (byte[]) messages.
You can either deal with messages that have keys(byte[]) and values.

Driver Set-up:
In order to create a TopologyTestDriver instance, you need a Topology and a Properties.
The configuration needs to be representative of what you’d supply to the real topology, so that means including several key properties (StreamsConfig).
For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:

val props = new Properties()
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091")
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
Topology topology = ...
TopologyTestDriver driver = new TopologyTestDriver(topology, props)

Processing messages:
Here’s an example of an input message on the topic named input-topic.

 val factory = new ConsumerRecordFactory(strSerializer, strSerializer)
 driver.pipeInput(factory.create("input-topic","key1", "value1"))

When TopologyTestDriver#pipeInput()(Send an input message with the given key, value, and timestamp on the specified topic to the topology and then commit the messages) is called, the driver passes the input message through to the appropriate source that consumes the named topic, and will invoke the processor(s) downstream of the source.

If your topology’s processors forward messages to sinks, your test can then consume these output messages to verify they match the expected outcome.
For example, if our topology should have generated 2 messages on output-topic-1 and 1 message on output-topic-2, then our test can obtain these messages using the TopologyTestDriver#readOutput(String, Deserializer, Deserializer)} method(Read the next record from the given topic):

 val record1: ProducerRecord[String, String] = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
 val record2: ProducerRecord[String, String]= driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
val record3: ProducerRecord[String, String] = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Processor state:
Some processors use Kafka state storage(StateStore), so this driver class provides the generic
getStateStore(store-name) as well as store-type specific methods so that your tests can check the underlying state store(s) used by your topology’s processors.
In our previous example, after we supplied a single input message and checked the three output messages, our test could also check the key-value store to verify the processor correctly added, removed, or updated internal state.
Our test might have pre-populated some state before submitting the input message and verified afterwards that the processor(s) correctly updated the state.

Here is Kafka-Streaming-Test github code that I used in the blog post. In next blog post, I will write how to test complex topologies like joins/KTables.


Kafka Streaming testing


Structured Streaming + Kafka Integration

In this post, I will show you how to create an end-to-end structured streaming pipeline. Let’s say, we have a requirement like:
JSON data being received in Kafka, Parse nested JSON, flatten it and store in structured Parquet table and get end-to-end failure guarantees.

//Step-1 Creating a Kafka Source for Streaming Queries
val rawData = spark.readStream
.option("subscribe", "topic")

val parsedData = rawData
.selectExpr("cast (value as string) as json"))
.select(from_json("json", schema).as("data"))

//Step-3 Writing Data to parquet
val query = parsedData.writeStream
.option("checkpointLocation", "/checkpoint")

Step-1: Reading Data from Kafka
Specify kafka options to configure
How to configure kafka server?
kafka.boostrap.servers => broker1,broker2 .load()
What to subscribe?
subscribe => topic1,topic2,topic3 // fixed list of topics
subscribePattern => topic* // dynamic list of topics
assign => {“topicA”:[0,1] } // specific partitions
Where to read?
startingOffsets => latest (default) / earliest / {“topicA”:{“0″:23,”1”:345} }

Step-2: Transforming Data
Each row in the source(rawData) has the following schema:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

Cast binary value to string Name it column json
//selectExpr(“cast (value as string) as json”)
Parse json string and expand into nested columns, name it data
//select(from_json(“json”, schema).as(“data”)))

Step-3: Writing to parquet.
Save parsed data as Parquet table in the given path
Partition files by date so that future queries on time slices of data is fast
Enable checkpointing by setting the checkpoint location to save offset logs
//.option(“checkpointLocation”, …)
start actually starts a continuous running StreamingQuery in the Spark cluster

Stay tuned for next post. 🙂


Easy, Scalable and Fault-tolerant Structured Streaming from Kafka to Spark

In this blog post, I will explain about spark structured streaming. Let’s first talk about what is structured streaming and how it works?
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

Let’s say you want to maintain a running program data received from Kafka to console(just an example). Below is the way of express Structured Streaming.

First, create a local SparkSession, the starting point of all functionalities related to Spark.

val spark = SparkSession.builder
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")

Next, let’s create a streaming DataFrame that represents data received from a server Kafka server.

Specify one or more locations to read data from
Built in support for Files/Kafka/Socket,pluggable
Can include multiple sources of different types using union()
val upstream = spark.readStream
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test-topic")
    .option("startingOffsets", "earliest")

This upstream DataFrame represents an unbounded table containing the streaming data. This table contains seven columns data named key, value, topic, partition, offset, timestamp and timestampType. each streaming data becomes a row in the table.
The upstream DataFrame has the following columns:

key(binary) value(binary) topic(string) partition(long) offset(long) timestamp(long) timestampType(int)
[binary] [binary] “topicA” 0 345 1486087873 0
[binary] [binary] “topicB” 3 2890 1486086721 0

For more information, you can visit on Spark-Kafka strutucured streaming options.

val data = upstream.selectExpr("CAST(value AS STRING)")
val downstream = data


So now you have transformed DataFrame one column named “value” by Casting binary value to string and injected console sink. All data coming from Kafka will print on console.
Here is an example that will receive data from multiple Kafka topics and will partitioned data by topic name.

val spark = SparkSession.builder
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val upstream = spark.readStream
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test,Airport,Airports,Carriers,Planedata")
    .option("startingOffsets", "earliest")
    .selectExpr("topic", "CAST(value AS STRING)")// Transform "topic" and "value" columned

  val downstream = upstream
// Partition by topic. it will create directory by topic name opic=Airport,topic=Carriers,topic=Planedata 
    .option("path", "/tmp/data")
    .option("checkpointLocation", checkpointLocation)


Here is complete source code.

Basic Concepts:

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
Note that Structured Streaming does not materialize the entire table.

Input Sources: There are a few built-in sources.
File source – Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet.
Kafka source – Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
Socket source (for testing) – Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
Rate source (for testing) – Generates data at the specified number of rows per second, each output row contains a timestamp and value.

There is a lot to explain about structured streaming so I can not write everything in the single post but hope you get a basic idea how structured stream works with Kafka.

Structured Streaming Programming Guide

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)