Tag Archives: Scala

Spark: Why should we not use inferSchema = true with Dataframe?

In this blog post, I have explained why we should not use inferSchema = true. It means we are telling spark to infer schema automatically.

The schema means here are the column types. A column can be of type String, Double, Long, etc.

If the schema is not specified using schema function and inferSchema option is enabled, this function goes through the input once to determine the input schema. It means it takes some time to infer a schema.

If the schema is not specified using schema function and inferSchema option is disabled then it determines the columns as string types and it reads only the first line to determine the names and the number of fields.

In the below examples, I have explained how much time it takes to infer a schema and with the same action.

time function calculate how much time a block of code takes to execute.

def time[A](name: String)(body: => A) = {
    val start = System.currentTimeMillis()
    val end = System.currentTimeMillis()
    println(s"$name Took ${end - start} millis")


Abdhesh,Software Engineer,8,1990-07-20
Shikha,Fullstak Developer,9,1989-07-02

1- Using inferSchema = true :

case class Developer(
      name: String,
      department: String,
      years_of_experience: Int,
      dob: Timestamp
  time("inferSchema = true") {
    val developerDF = spark.read
      .option("inferSchema", "true")
      .option("header", "true")
      .option("timestampFormat", "yyyy-MM-dd")
    import spark.implicits._
    val developerDS = developerDF.as[Developer]
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
inferSchema = true Took 18040 millis

2- Using explicit schema: (inferSchema = false)

time("inferSchema = false") {
    val schema = StructType(
        StructField("name", StringType, false),
        StructField("department", StringType, false),
        StructField("years_of_experience", IntegerType, false),
        StructField("dob", TimestampType, false)
    val developerDF = spark.read
      .option("header", "true")
    import spark.implicits._
    val developerDS = developerDF.as[Developer]
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
inferSchema = false Took 718 millis

If you don’t want to define schema explicit then you can derive schema from an encoder.

time("inferSchema = false, derive schema from an encoder") {
    implicit val encoderDeveloper = Encoders.product[Developer]
    val developerDF = spark.read
      .option("header", "true")
    val developerDS = developerDF.as[Developer]
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
inferSchema = false, derive schema from an encoder Took 388 millis

3- Infer schema dynamically

Dynamically we can infer the schema from the first row of the CSV(after the header row) and set while reading full CSV. It is the best trick to get schema dynamically if you don’t know the schema of CSV.¬†

 time("Infer schema from first row") {
    val developerDF1RowSchema = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("timestampFormat", "yyyy-MM-dd")

    val developerDF = spark.read
      .option("header", "true")
      .option("timestampFormat", "yyyy-MM-dd")

    import spark.implicits._
    val developerDS = developerDF.as[Developer]
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
Infer schema from first row Took 3570 millis

Now If you compare between approach 1st and 2nd, the processing time is dropped ~97%. I just have two records in my CSV file. Think about, if you have a huge CSV file then you could get better performance by defining schema explicitly. So you should never ever use inferSchema = true. If you want, you can get a code from my Github repository.

Happy coding ūüôā

Kafka Stream Topology Testing

In this blog post, I will explain how to test Kafka stream topologies.

Kafka Stream topologies can be quite complex and it is important for developers to test their code. There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.

Add below dependency in build.sbt

libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "1.1.0" % Test

Below code example is well-known word count application.

import java.lang.Long

import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.{StreamsBuilder, Topology}

import scala.collection.JavaConverters._
class WordCountApplication {
  def countNumberOfWords(inputTopic: String,
                         outputTopic: String, storeName: String): Topology = {
    val builder: StreamsBuilder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream(inputTopic)
    val wordCounts: KTable[String, Long] = textLines
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as(storeName).asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
    wordCounts.toStream().to(outputTopic, Produced.`with`(Serdes.String(), Serdes.Long()))

Unit test for above topology.

import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.StreamsConfig
trait TestSpec {
protected val stringDeserializer = new StringDeserializer()
protected val longDeserializer = new LongDeserializer()
val config = new Properties()
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-application")
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.scalatest.{FlatSpec, Matchers}

class WordCountSpec extends FlatSpec with Matchers with TestSpec {

  it should "count number of words" in {
    val wordCountApplication = new WordCountApplication()
    val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
    val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
    val words = "Hello Kafka Streams, All streams lead to Kafka"
    val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
    store.get("hello") shouldBe 1
    store.get("kafka") shouldBe 2
    store.get("streams") shouldBe 2
    store.get("lead") shouldBe 1
    store.get("to") shouldBe 1

Let me explain classes used in testing the topology.

This class makes it easier to write tests to verify the behaviour of topologies created with Topology or StreamsBuilder. You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors, sinks, or sub-topologies.

The best thing about TopologyTestDriver is, it works without a real Kafka broker, so the tests execute very quickly with very little overhead.

Using the TopologyTestDriver in tests is easy:  simply instantiate the driver and provide a Topology and StreamsBuilder#build() and Properties configs, use the driver to supply an input message to the topology, and then use the driver to read and verify any messages output by the topology.

Although the driver doesn’t use a real Kafka broker, it does simulate Kafka Consumer and Producer that read and write raw (byte[]) messages.
You can either deal with messages that have keys(byte[]) and values.

Driver Set-up:
In order to create a TopologyTestDriver instance, you need a Topology and a Properties.
The configuration needs to be representative of what you’d supply to the real topology, so that means including several key properties (StreamsConfig).
For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:

val props = new Properties()
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091")
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
Topology topology = ...
TopologyTestDriver driver = new TopologyTestDriver(topology, props)

Processing messages:
Here’s an example of an input message on the topic named input-topic.

 val factory = new ConsumerRecordFactory(strSerializer, strSerializer)
 driver.pipeInput(factory.create("input-topic","key1", "value1"))

When TopologyTestDriver#pipeInput()(Send an input message with the given key, value, and timestamp on the specified topic to the topology and then commit the messages) is called, the driver passes the input message through to the appropriate source that consumes the named topic, and will invoke the processor(s) downstream of the source.

If your topology’s processors forward messages to sinks, your test can then consume these output messages to verify they match the expected outcome.
For example, if our topology should have generated 2 messages on output-topic-1 and 1 message on output-topic-2, then our test can obtain these messages using the TopologyTestDriver#readOutput(String, Deserializer, Deserializer)} method(Read the next record from the given topic):

 val record1: ProducerRecord[String, String] = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
 val record2: ProducerRecord[String, String]= driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
val record3: ProducerRecord[String, String] = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Processor state:
Some processors use Kafka state storage(StateStore), so this driver class provides the generic
getStateStore(store-name) as well as store-type specific methods so that your tests can check the underlying state store(s) used by your topology’s processors.
In our previous example, after we supplied a single input message and checked the three output messages, our test could also check the key-value store to verify the processor correctly added, removed, or updated internal state.
Our test might have pre-populated some state before submitting the input message and verified afterwards that the processor(s) correctly updated the state.

Here is Kafka-Streaming-Test github code that I used in the blog post. In next blog post, I will write how to test complex topologies like joins/KTables.


Kafka Streaming testing


When should you use Future/Future.successful/Future.failed.

In this blog post, I would recommend when should you use Future()/Future.succcessful/Future.failed?

    1. Use Future.apply() or simply Future() (i.e., Future block): In the situations, where something to be done asynchronously that can complete sometime in future and may deal with some time consuming operations such as network calls, database operations communicate with one or many other services, processing huge data by consuming multiple cores and etc.
    2. Use Future.successful: When a literal or already computed value to be passed back as a successful future response.
    3. Use Future.failed: When a known and literal exception to be thrown back without performing any further actions in the future.
    4. Future.fromTry: When you already computed Try a value

Future.successful, Future.failed, and Future.fromTry when you need to create an instance of Future and you already have the value.






An Easy and fast way of installing Java, Scala and Spark

1. Download and Install Java 8

abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://download.oracle.com/otn-pub/java/jdk/8u151-b12/e758a0de34e24606bca991d704f6dcbf/jdk-8u151-linux-x64.tar.gz

Extract tar file:

abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf jdk-8u151-linux-x64.tar.gz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
jdk-8u151-linux-x64  jdk-8u151-linux-x64.tar.gz

Set environment path variable for Java

abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.

export JAVA=/home/abdhesh/Documents/Applications/jdk-8u151-linux-x64
export PATH=$JAVA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session

abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run java version command:

abdhesh@abdhesh-latitude:~/Documents/Applications$ java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

 2. Download and Install Scala

abdhesh@abdhesh-latitude:~/Documents/Applications$ wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.tgz

Extract tar file:

abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf scala-2.12.4.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
scala-2.12.4  scala-2.12.4.tgz

Set environment path variable for scala

abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc
export SCALA=/home/abdhesh/Documents/Applications/scala-2.12.4
export PATH=$JAVA/bin:$SCALA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session

abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run scala version command:

abdhesh@abdhesh-latitude:~/Documents/Applications$ scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

3. Download and Install Apache Spark

abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://apache.mirror.anlx.net/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

Extract tar file:

abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf spark-2.2.1-bin-hadoop2.7.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
spark-2.2.1-bin-hadoop2.7  spark-2.2.1-bin-hadoop2.7.tgz

Set environment path variable for spark

abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.

export SPARK=/home/abdhesh/Documents/Applications/spark-2.2.1-bin-hadoop2.7
export PATH=$JAVA/bin:$SCALA/bin:$SPARK/bin:$PATH

Now reload a¬†.bashrc file on same terminal’s session

abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run Spark shell:

Here is link of people.json file

abdhesh@abdhesh-latitude:~/Documents/Applications$ spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/28 01:02:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/28 01:02:17 WARN Utils: Your hostname, abdhesh-latitude resolves to a loopback address:; using instead (on interface wlp2s0)
17/12/28 01:02:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at
Spark context available as 'sc' (master = local[*], app id = local-1514422939241).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df  = spark.read.json("spark-2.2.1-bin-hadoop2.7/data/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.filter("age >= 19").select("name","age").show()
|  name|age|
|  Andy| 30|
|Justin| 19|

scala> //using Sql

scala> df.createOrReplaceTempView("people")

scala> spark.sql("SELECT * FROM people WHERE age >=19").show()
|age|  name|
| 30|  Andy|
| 19|Justin|


Stay tuned for next blog post ūüôā

Part-1: Multiple ways of handling an exception in scala

There are multiple ways of handling an exception in Scala. In this blog, I will explain one by one.

1- Using try/catch/finally

 val tryCatch = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ex: Exception =>
      //Code here for handle an exception

  val tryMultipleCatch = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ae: ArithmeticException =>
    //Code here for handle an exception
    case ex: Exception =>
    //Code here for handle an exception

  val tryMultipleCatchFinally = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ae: ArithmeticException =>
    //Code here for handle an ArithmeticException
    case ex: Exception =>
    //Code here for handle an Exception
  } finally {
    //Code here, will always be execute whether an exception is thrown or not

  val tryCatchWithValue: Int = try {
    //Code here that might raise an exception
  } catch {
    case ne: NumberFormatException => 0

2. Using scala.util.Try
The Try type represents a computation that may either result in an exception, or return a successfully computed value.
Instances of Try[T], are either an instance of scala.util.Success[T] or scala.util.Failure[T]

The Try has an ability to pipeline, or chain, operations, catching exceptions along the way like flatMap and map combinators.

import scala.util.{Failure, Success, Try}

  val withTry = Try("1".toInt) // Success(1)
  withTry match {
    case Success(value) => println(value)
    case Failure(ex) =>
      //Code here for handle an exception

  val tryWithRecover = Try("Non-Numeric-Value".toInt) match {
    case Success(value) => println(value)
    case Failure(ex) => println(ex)

  //Try's map,flatMap,fold etc
  def inc(n: Int): Int = n + 1

  val try1 = Try("abc".toInt)
  val tResult = try1.map(f => inc(f))// The function `inc` will execute when `Try("abc".toInt)` doesn't raise an exception

Try’s recover and recoverWith: Applies the given function f if this is a Failure, otherwise returns this if this is a Success.

//Recover with value
val tryWithRecoverF = Try("Non-Numeric-Value".toInt).recover {
    //Here you pattern match on type of an exception
    case ne: NumberFormatException => 0
    case ex: Exception => 0

//Recover with an another Try
  def recoverWith(first: String, second: String): Try[Int] = {
    //The code of recoverWith function will execute when `Try(first.toInt)` raise an exception
    Try(first.toInt).recoverWith {
      case ne: NumberFormatException => Try(second.toInt)

Note: all Try combinators like map,flatMap, filter, fold, recover, recoverWith, transform, collect will catch exceptions

def compute(number: Int, divideBY: Int): Int = number / divideBY

  val t1 = Try("123".toInt).map(n => compute(n, 2)) //Success(61)
  val t2 = Try("123".toInt).map(n => compute(n, 0)) //Failure(java.lang.ArithmeticException: / by zero)
def computeWithTry(value: String): Try[Int] = Try(value.toInt)

  val r1: Try[Int] = computeWithTry("123")
    ex => println(ex),
    value => println(compute(value, 2))

    ex => println(s"Exception--${ex}"),
    value => println(compute(value, 0))
  ) // Exception--java.lang.ArithmeticException: / by zero

    ex => println(ex),
    value => println(compute(value, 2))

  computeWithTry("123").map(n => compute(n, 2)) //Success(61)
  computeWithTry("123").map(n => compute(n, 0)) //Failure(java.lang.ArithmeticException: / by zero)
  computeWithTry("abc").map(n => compute(n, 2)) //Failure(java.lang.NumberFormatException: For input string: "abc")

Note: only non-fatal exceptions are caught by the combinators on Try (see scala.util.control.NonFatal). Serious system errors, on the other hand, will be thrown.

Here you can find complete code

Stay tuned for next part ūüôā

References: scala.util.Try

Methods with variable arguments

  • Scala methods can have variable arguments (vararg).
  • A method can be specified to have a variable number of arguments by adding a¬†*¬†after the type of the parameter.
  • As an example, let’s define a method that takes a variable number of arguments of type String and that returns their concatenation as¬†String:
  • For obvious reasons, a method can only have one parameter that has variable arguments and it should be the last parameter.
scala> def concatStrings(s: String*): String = s.mkString
concatStrings: (s: String*)String

scala> concatStrings("a", "b", "c")
res0: String = abc

scala> def concatStringsSep(separator: String, s: String*): String =

scala> concatStringsSep("/", "a", "b", "c")
res1: String = a/b/c

you can pass sequence as variable length arguments to a function.

scala> val listOfStrings = List("first","second","third")
listOfStrings: List[String] = List(first, second, third)

scala> concatStrings(listOfStrings:_*)
res5: String = firstsecondthird

scala> concatStringsSep(",",listOfStrings:_*)
res6: String = first,second,third

Scala pure/total functions


In this blog, I am going to explain about pure/total function.

Let’s discuss what is a function?

A function is a process which takes some input, called arguments, and produces some output called a return value

A pure function is a function which:
1. Given the same input, will always return the same output, called Determinism.
2. Produces no side effects.

It described how inputs relate to outputs, without spelling out the steps to get from A to B. Every function call must produce results in isolation. Pure functions are required to construct pure expressions.

The function result value cannot depend on any hidden information or state that may change while program execution proceeds or between different executions of the program, nor can it depend on any external input from I/O devices.

They are easy to parallelized. The pure functions are referentially transparent, we only need to compute their output once for given inputs. Caching and reusing the result of a computation is called memoization, and can only be done safely with pure functions.

Pure functions are also extremely independent ‚ÄĒ easy to move around, refactor, and reorganize in your code, making your programs more flexible and adaptable to future changes.

In general,

Pure function = Output depends on input + No side effects

f:X->Y, The f is a function which takes X as input and returns Y as output.
There is an expression like (f(3),f(3)), here we are calling function f two times for same input.
so f is a pure function, you can modify an expression.I have lifted out common expression f(3) into a variable because I know every time f(3) will give me the same result.
cont x = f(3)
result = (x,x)

For example, Pure functions

  • sin(x), returning the¬†sine¬†of a number¬†x
  • length(s), returning the size of a string¬†s

Pure functions in Scala,

scala>def square(a: Int) = a * a
square: (a: Int)Int

scala> def pureFunc(x : Int, y : Int) = x + y
pureFunc: (x: Int, y: Int)Int

scala> pureFunc(1,2)
res0: Int = 3
scala> (pureFunc(1,2), pureFunc(1,2))
res1:(Int,Int) =(3,3)
scala> val fr = pureFunc(1,2)
scala>(fr,fr) //fr would not evaluate again and 
res2:(Int,Int) =(3,3)

scala> def impureFunc(x: Int,y: Int) = println(x+y)
impureFunc: (x: Int, y: Int)Unit

scala> impureFunc(1,2)

scala> def anotherImpureFunc(x: Int) = if(x > 0) x
anotherImpureFunc: (x: Int)AnyVal

scala> anotherImpureFunc(2)
res2: AnyVal = 2

scala> anotherImpureFunc(-1)
res3: AnyVal = ()