What is Kleisli or composition of monadic functions in Scala?

Hi All,

Hope you are doing great!

In this blog post, I am going to explain what is Kleisli or composition of monadic functions and how to use it. Before understanding Kleisli, let’s learn how to define functions and composition of the functions in Scala.

val getDataFromDb: Int => Int =(id: Int) => 10 //Assuming it will return Int value from DB
val processNumber: Int => Int       = (v: Int) => v * 2
val writeDataToDB: Int => Boolean = (v: Int) => true // Assuming succssfully db write will return Boolean value

We have learnt how to define functions in Scala. Here you might be thinking why we have defined functions instead of methods? because in Scala, Functions and Methods both are not the same thing if I am not wrong. Methods are not values and you can not compose them without eta expansion. If you want, you can learn more about why/what eta expansion and why methods are not functions.

Now I am hoping, you have understood functions. Nowadays, People are crazy to talk about Functional programming and there is a lot of stuff over the internet to explain what is functional programming and why it needs now but there is a brief definition of functional programming by @jdegoes 

“The rest is just composition you can learn over time” this is what the topic of this blog post.

Let’s take an example:

scala> val f: String => String = (s: String) => "f(" + s + ")"
f: String => String = $$Lambda$1078/554280593@e521067

scala> val g: String => String = (s: String) => "g(" + s + ")"
g: String => String = $$Lambda$1079/753170002@65d90b7f

scala> val fComposeG = f compose g // It is similar to call f(g("Hello,World"))
fComposeG: String => String = scala.Function1$$Lambda$1080/820069375@452ec287

scala> fComposeG("Hello,World")
res1: String = f(g(Hello,World))

scala> val fAndThenG = f  andThen g  // It is similar to g(f("Hello,World"))
fAndThenG: String => String = scala.Function1$$Lambda$1065/1796415927@40c6d1ef

scala> fAndThenG("Hello,World")
res2: String = g(f(Hello,World))

All functions that have single input are syntactic sugar of Function1[-T1, +R] and it has two functions

def andThen[A](g: (R) ⇒ A): (T1) ⇒ A
Composes two instances of Function1 in a new Function1, with this function applied first.

def compose[A](g: (A) ⇒ T1): (A) ⇒ R
Composes two instances of Function1 in a new Function1, with this function applied last.

Now I am hoping, you have understood how to compose functions. now let’s work on the first example.

scala> val result = getDataFromDb andThen processNumber andThen writeDataToDB
result: Int => Boolean = scala.Function1$$Lambda$1065/1796415927@25291901

scala> result(12)
res3: Boolean = true

Sometimes you want your output in some context to delay your processing or want to run the program in effect. If you want to learn about effects here is awesome talk about Functional Programming with Effects

Let’s define functions to return monadic value i.e return value in a context, for instance, an Option, Either, Try, Future, IO, Reader, Writer etc:

//Below are the Monadic Functions

val getDataFromDbOpt: Int => Option[Int] = (id: Int) => Some(10) 
val processNumberOpt: Int => Option[Int]       = (v: Int) => Some(v * 2)
val writeDataToDBOpt: Int => Option[Boolean] = (v: Int) => Some(true)

val result = getDataFromDbOpt andThen processNumberOpt andThen writeDataToDBOpt

//It will give you below compilation error
error: type mismatch;
 found   : scala.this.Function1[scala.this.Int,scala.this.Option[scala.this.Int]]
 required: scala.this.Function1[scala.this.Option[scala.this.Int],?]
  val result = getDataFromDbOpt andThen processNumberOpt andThen writeDataToDBOpt

Ops, this is not what we are expecting. let’s fix this an error.

final case class KleisliForOption[A, B](run: A => Option[B]) {
  def andThen[C](k: KleisliForOption[B, C]): KleisliForOption[A, C] =
    KleisliForOption { a => run(a).flatMap(b => k.run(b)) }

val getDataFromDbOpt: Int => Option[Int]     = (id: Int) => Some(10)
val processNumberOpt: Int => Option[Int]     = (v: Int) => Some(v * 2)
val writeDataToDBOpt: Int => Option[Boolean] = (v: Int) => Some(true) 

val result = KleisliForOption(getDataFromDbOpt) andThen KleisliForOption(processNumberOpt) andThen KleisliForOption(writeDataToDBOpt)
result.run(12) //Output Some(true)

Now you can see, we are able to compose functions that have returned monadic value Option. what if, your functions return monadic value Either
For example,

val getDataFromDbEither: Int => Either[String,Int]     = (id: Int) => Right(10)
val processNumberEither: Int => Either[String,Int]     = (v: Int) => Right(v * 2)
val writeDataToDBEither: Int => Either[String,Boolean] = (v: Int) => Right(true)

val result = KleisliForOption(getDataFromDbEither) andThen KleisliForOption(processNumberEither) andThen KleisliForOption(
//It fails with below compilation an error
Error:(12, 112) type mismatch;
 found   : Int => Either[String,Int]
 required: ? => Option[?]
val result = KleisliForOption(getDataFromDbEither) andThen KleisliForOption(processNumberEither) andThen KleisliForOption(writeDataToDBEither) 

Let’s fix this error as well,

final case class KleisliForEither[A, B,E](run: A => Either[E,B]) {
  def andThen[C](k: KleisliForEither[B, C,E]): KleisliForEither[A, C,E] =
    KleisliForEither { a => run(a).flatMap(b => k.run(b))

val getDataFromDbEither: Int => Either[String,Int]     = (id: Int) => Right(10)
val processNumberEither: Int => Either[String,Int]     = (v: Int) => Right(v * 2)
val writeDataToDBEither: Int => Either[String,Boolean] = (v: Int) => Right(true)

val result = KleisliForEither(getDataFromDbEither) andThen KleisliForEither(processNumberEither) andThen KleisliForEither(
result.run(12) //Output Either[String,Boolean] = Right(true)

What if your functions return monadic value like Option, Either, Try, Future, IO, Reader, Writer etc then you have to define a similar structure for every monadic value as I did. let’s not define our structure because the same structure is already defined with cats library and it is much powerful then what I have defined.

Kleisli Type Signature:

The Kleisli type is a wrapper around, A=>F[B]

final case class Kleisli[F[_], A, B](run: A => F[B])

Where F is some context that is a Monad, A is an Input and B is a output.

 import cats.Id
  import cats.effect.IO
  import cats.data.Kleisli
  import cats.implicits._

   //For Option
  val getDataFromDbOpt: Int => Option[Int]     = (id: Int) => Some(10)
  val processNumberOpt: Int => Option[Int]     = (v: Int) => Some(v * 2)
  val writeDataToDBOpt: Int => Option[Boolean] = (v: Int) => Some(true)

  val resultOpt = Kleisli(getDataFromDbOpt) andThen Kleisli(processNumberOpt) andThen Kleisli(
  resultOpt.run(12) //Option[Boolean]

  //For Id monad
  val getDataFromDbFun: Kleisli[Id, Int, Int]     = Kleisli.apply(id => 10: Id[Int]) // I did because compiler is not infering type
  val processNumberFun: Kleisli[Id, Int, Int]     = Kleisli.apply(v => (v * 2): Id[Int])
  val writeDataToDBFun: Kleisli[Id, Int, Boolean] = Kleisli.apply(v => true: Id[Boolean])

  val resultFunc = getDataFromDbFun andThen processNumberFun andThen writeDataToDBFun
  resultFunc.run(12) //Id[Boolean] which is Boolean

 //For Either
  type ET[A] = Either[String, A]
  val getDataFromDbEither: Kleisli[ET, Int, Int] =
    Kleisli[ET, Int, Int](id => Right(10))

  val processNumberEither: Kleisli[ET, Int, Int]     = Kleisli[ET, Int, Int](v => Right(v * 2))
  val writeDataToDBEither: Kleisli[ET, Int, Boolean] = Kleisli[ET, Int, Boolean](v => Right(true))

  val result = getDataFromDbEither andThen processNumberEither andThen writeDataToDBEither
  result.run(12) //Either[String,Boolean]

  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext.Implicits.global

  //For Future
  val getDataFromDbFuture: Kleisli[Future, Int, Int] = Kleisli.apply(_ => Future.successful(10))
  val processNumberFuture: Kleisli[Future, Int, Int] = Kleisli.apply(v => Future.successful(v * 2))
  val writeDataToDBFuture: Kleisli[Future, Int, Boolean] =
    Kleisli.apply(_ => Future.successful(true))

  val resultFuture = getDataFromDbFuture andThen processNumberFuture andThen writeDataToDBFuture
  resultFuture.run(12) //Future[Boolean]

  //For IO
  val getDataFromDbIO: Kleisli[IO, Int, Int]     = Kleisli.apply(_ => IO.pure(10))
  val processNumberIO: Kleisli[IO, Int, Int]     = Kleisli.apply(v => IO(v * 2))
  val writeDataToDBIO: Kleisli[IO, Int, Boolean] = Kleisli.apply(_ => IO.pure(true))

  val resultIO = getDataFromDbIO andThen processNumberIO andThen writeDataToDBIO
  resultIO.run(12)/*IO[Boolean]*/.unsafeRunSync() //Boolean

You can also use for-comprehension on Kleisli.

//For IO
  val getDataFromDbIO: Kleisli[IO, Int, Int]     = Kleisli.apply(_ => IO.pure(10))
  val processNumberIO: Kleisli[IO, Int, Int]     = Kleisli.apply(v => IO(v * 2))
  val writeDataToDBIO: Kleisli[IO, Int, Boolean] = Kleisli.apply(_ => IO.pure(true))

  val anotherFunction: Int => Kleisli[IO, Int, Int] = (v: Int) => Kleisli.liftF(processNumberIO(v))
  val oneMoreFunction: Int => Kleisli[IO, Int, Boolean] = (v: Int) =>
  val resultOfFor: Kleisli[IO, Int, Boolean] = for {
    v  <- getDataFromDbIO
    vv <- anotherFunction(v)
    r  <- oneMoreFunction(vv)
  } yield r

Kleisli is just function A=>F[B] here F is Monad like Option, Try, Either, IO, Future(it is not completely monad but it is considered as monad). It enables the composition of functions that return a monadic value or monadic functions like A-> F[B].

References :

Kafka Stream Topology Testing

In this blog post, I will explain how to test Kafka stream topologies.

Kafka Stream topologies can be quite complex and it is important for developers to test their code. There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.

Add below dependency in build.sbt

libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "1.1.0" % Test

Below code example is well-known word count application.

import java.lang.Long

import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.{StreamsBuilder, Topology}

import scala.collection.JavaConverters._
class WordCountApplication {
  def countNumberOfWords(inputTopic: String,
                         outputTopic: String, storeName: String): Topology = {
    val builder: StreamsBuilder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream(inputTopic)
    val wordCounts: KTable[String, Long] = textLines
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as(storeName).asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
    wordCounts.toStream().to(outputTopic, Produced.`with`(Serdes.String(), Serdes.Long()))

Unit test for above topology.

import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.StreamsConfig
trait TestSpec {
protected val stringDeserializer = new StringDeserializer()
protected val longDeserializer = new LongDeserializer()
val config = new Properties()
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-application")
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.scalatest.{FlatSpec, Matchers}

class WordCountSpec extends FlatSpec with Matchers with TestSpec {

  it should "count number of words" in {
    val wordCountApplication = new WordCountApplication()
    val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
    val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
    val words = "Hello Kafka Streams, All streams lead to Kafka"
    val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
    store.get("hello") shouldBe 1
    store.get("kafka") shouldBe 2
    store.get("streams") shouldBe 2
    store.get("lead") shouldBe 1
    store.get("to") shouldBe 1

Let me explain classes used in testing the topology.

This class makes it easier to write tests to verify the behaviour of topologies created with Topology or StreamsBuilder. You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors, sinks, or sub-topologies.

The best thing about TopologyTestDriver is, it works without a real Kafka broker, so the tests execute very quickly with very little overhead.

Using the TopologyTestDriver in tests is easy:  simply instantiate the driver and provide a Topology and StreamsBuilder#build() and Properties configs, use the driver to supply an input message to the topology, and then use the driver to read and verify any messages output by the topology.

Although the driver doesn’t use a real Kafka broker, it does simulate Kafka Consumer and Producer that read and write raw (byte[]) messages.
You can either deal with messages that have keys(byte[]) and values.

Driver Set-up:
In order to create a TopologyTestDriver instance, you need a Topology and a Properties.
The configuration needs to be representative of what you’d supply to the real topology, so that means including several key properties (StreamsConfig).
For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:

val props = new Properties()
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091")
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
Topology topology = ...
TopologyTestDriver driver = new TopologyTestDriver(topology, props)

Processing messages:
Here’s an example of an input message on the topic named input-topic.

 val factory = new ConsumerRecordFactory(strSerializer, strSerializer)
 driver.pipeInput(factory.create("input-topic","key1", "value1"))

When TopologyTestDriver#pipeInput()(Send an input message with the given key, value, and timestamp on the specified topic to the topology and then commit the messages) is called, the driver passes the input message through to the appropriate source that consumes the named topic, and will invoke the processor(s) downstream of the source.

If your topology’s processors forward messages to sinks, your test can then consume these output messages to verify they match the expected outcome.
For example, if our topology should have generated 2 messages on output-topic-1 and 1 message on output-topic-2, then our test can obtain these messages using the TopologyTestDriver#readOutput(String, Deserializer, Deserializer)} method(Read the next record from the given topic):

 val record1: ProducerRecord[String, String] = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
 val record2: ProducerRecord[String, String]= driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
val record3: ProducerRecord[String, String] = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Processor state:
Some processors use Kafka state storage(StateStore), so this driver class provides the generic
getStateStore(store-name) as well as store-type specific methods so that your tests can check the underlying state store(s) used by your topology’s processors.
In our previous example, after we supplied a single input message and checked the three output messages, our test could also check the key-value store to verify the processor correctly added, removed, or updated internal state.
Our test might have pre-populated some state before submitting the input message and verified afterwards that the processor(s) correctly updated the state.

Here is Kafka-Streaming-Test github code that I used in the blog post. In next blog post, I will write how to test complex topologies like joins/KTables.


Kafka Streaming testing


When should you use Future/Future.successful/Future.failed.

In this blog post, I would recommend when should you use Future()/Future.succcessful/Future.failed?

    1. Use Future.apply() or simply Future() (i.e., Future block): In the situations, where something to be done asynchronously that can complete sometime in future and may deal with some time consuming operations such as network calls, database operations communicate with one or many other services, processing huge data by consuming multiple cores and etc.
    2. Use Future.successful: When a literal or already computed value to be passed back as a successful future response.
    3. Use Future.failed: When a known and literal exception to be thrown back without performing any further actions in the future.
    4. Future.fromTry: When you already computed Try a value

Future.successful, Future.failed, and Future.fromTry when you need to create an instance of Future and you already have the value.






Structured Streaming + Kafka Integration

In this post, I will show you how to create an end-to-end structured streaming pipeline. Let’s say, we have a requirement like:
JSON data being received in Kafka, Parse nested JSON, flatten it and store in structured Parquet table and get end-to-end failure guarantees.

//Step-1 Creating a Kafka Source for Streaming Queries
val rawData = spark.readStream
.option("subscribe", "topic")

val parsedData = rawData
.selectExpr("cast (value as string) as json"))
.select(from_json("json", schema).as("data"))

//Step-3 Writing Data to parquet
val query = parsedData.writeStream
.option("checkpointLocation", "/checkpoint")

Step-1: Reading Data from Kafka
Specify kafka options to configure
How to configure kafka server?
kafka.boostrap.servers => broker1,broker2 .load()
What to subscribe?
subscribe => topic1,topic2,topic3 // fixed list of topics
subscribePattern => topic* // dynamic list of topics
assign => {“topicA”:[0,1] } // specific partitions
Where to read?
startingOffsets => latest (default) / earliest / {“topicA”:{“0″:23,”1”:345} }

Step-2: Transforming Data
Each row in the source(rawData) has the following schema:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

Cast binary value to string Name it column json
//selectExpr(“cast (value as string) as json”)
Parse json string and expand into nested columns, name it data
//select(from_json(“json”, schema).as(“data”)))

Step-3: Writing to parquet.
Save parsed data as Parquet table in the given path
Partition files by date so that future queries on time slices of data is fast
Enable checkpointing by setting the checkpoint location to save offset logs
//.option(“checkpointLocation”, …)
start actually starts a continuous running StreamingQuery in the Spark cluster

Stay tuned for next post. 🙂

Reference: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

Easy, Scalable and Fault-tolerant Structured Streaming from Kafka to Spark

In this blog post, I will explain about spark structured streaming. Let’s first talk about what is structured streaming and how it works?
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

Let’s say you want to maintain a running program data received from Kafka to console(just an example). Below is the way of express Structured Streaming.

First, create a local SparkSession, the starting point of all functionalities related to Spark.

val spark = SparkSession.builder
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")

Next, let’s create a streaming DataFrame that represents data received from a server Kafka server.

Specify one or more locations to read data from
Built in support for Files/Kafka/Socket,pluggable
Can include multiple sources of different types using union()
val upstream = spark.readStream
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test-topic")
    .option("startingOffsets", "earliest")

This upstream DataFrame represents an unbounded table containing the streaming data. This table contains seven columns data named key, value, topic, partition, offset, timestamp and timestampType. each streaming data becomes a row in the table.
The upstream DataFrame has the following columns:

key(binary) value(binary) topic(string) partition(long) offset(long) timestamp(long) timestampType(int)
[binary] [binary] “topicA” 0 345 1486087873 0
[binary] [binary] “topicB” 3 2890 1486086721 0

For more information, you can visit on Spark-Kafka strutucured streaming options.

val data = upstream.selectExpr("CAST(value AS STRING)")
val downstream = data


So now you have transformed DataFrame one column named “value” by Casting binary value to string and injected console sink. All data coming from Kafka will print on console.
Here is an example that will receive data from multiple Kafka topics and will partitioned data by topic name.

val spark = SparkSession.builder
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val upstream = spark.readStream
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test,Airport,Airports,Carriers,Planedata")
    .option("startingOffsets", "earliest")
    .selectExpr("topic", "CAST(value AS STRING)")// Transform "topic" and "value" columned

  val downstream = upstream
// Partition by topic. it will create directory by topic name opic=Airport,topic=Carriers,topic=Planedata 
    .option("path", "/tmp/data")
    .option("checkpointLocation", checkpointLocation)


Here is complete source code.

Basic Concepts:

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
Note that Structured Streaming does not materialize the entire table.

Input Sources: There are a few built-in sources.
File source – Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet.
Kafka source – Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
Socket source (for testing) – Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
Rate source (for testing) – Generates data at the specified number of rows per second, each output row contains a timestamp and value.

There is a lot to explain about structured streaming so I can not write everything in the single post but hope you get a basic idea how structured stream works with Kafka.

Structured Streaming Programming Guide

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

An Easy and fast way of installing Java, Scala and Spark

1. Download and Install Java 8

abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://download.oracle.com/otn-pub/java/jdk/8u151-b12/e758a0de34e24606bca991d704f6dcbf/jdk-8u151-linux-x64.tar.gz

Extract tar file:

abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf jdk-8u151-linux-x64.tar.gz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
jdk-8u151-linux-x64  jdk-8u151-linux-x64.tar.gz

Set environment path variable for Java

abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.

export JAVA=/home/abdhesh/Documents/Applications/jdk-8u151-linux-x64
export PATH=$JAVA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session

abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run java version command:

abdhesh@abdhesh-latitude:~/Documents/Applications$ java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

 2. Download and Install Scala

abdhesh@abdhesh-latitude:~/Documents/Applications$ wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.tgz

Extract tar file:

abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf scala-2.12.4.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
scala-2.12.4  scala-2.12.4.tgz

Set environment path variable for scala

abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc
export SCALA=/home/abdhesh/Documents/Applications/scala-2.12.4
export PATH=$JAVA/bin:$SCALA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session

abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run scala version command:

abdhesh@abdhesh-latitude:~/Documents/Applications$ scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

3. Download and Install Apache Spark

abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://apache.mirror.anlx.net/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

Extract tar file:

abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf spark-2.2.1-bin-hadoop2.7.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
spark-2.2.1-bin-hadoop2.7  spark-2.2.1-bin-hadoop2.7.tgz

Set environment path variable for spark

abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.

export SPARK=/home/abdhesh/Documents/Applications/spark-2.2.1-bin-hadoop2.7
export PATH=$JAVA/bin:$SCALA/bin:$SPARK/bin:$PATH

Now reload a .bashrc file on same terminal’s session

abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run Spark shell:

Here is link of people.json file

abdhesh@abdhesh-latitude:~/Documents/Applications$ spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/28 01:02:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/28 01:02:17 WARN Utils: Your hostname, abdhesh-latitude resolves to a loopback address:; using instead (on interface wlp2s0)
17/12/28 01:02:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at
Spark context available as 'sc' (master = local[*], app id = local-1514422939241).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df  = spark.read.json("spark-2.2.1-bin-hadoop2.7/data/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.filter("age >= 19").select("name","age").show()
|  name|age|
|  Andy| 30|
|Justin| 19|

scala> //using Sql

scala> df.createOrReplaceTempView("people")

scala> spark.sql("SELECT * FROM people WHERE age >=19").show()
|age|  name|
| 30|  Andy|
| 19|Justin|


Stay tuned for next blog post 🙂

Part-1: Multiple ways of handling an exception in scala

There are multiple ways of handling an exception in Scala. In this blog, I will explain one by one.

1- Using try/catch/finally

 val tryCatch = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ex: Exception =>
      //Code here for handle an exception

  val tryMultipleCatch = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ae: ArithmeticException =>
    //Code here for handle an exception
    case ex: Exception =>
    //Code here for handle an exception

  val tryMultipleCatchFinally = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ae: ArithmeticException =>
    //Code here for handle an ArithmeticException
    case ex: Exception =>
    //Code here for handle an Exception
  } finally {
    //Code here, will always be execute whether an exception is thrown or not

  val tryCatchWithValue: Int = try {
    //Code here that might raise an exception
  } catch {
    case ne: NumberFormatException => 0

2. Using scala.util.Try
The Try type represents a computation that may either result in an exception, or return a successfully computed value.
Instances of Try[T], are either an instance of scala.util.Success[T] or scala.util.Failure[T]

The Try has an ability to pipeline, or chain, operations, catching exceptions along the way like flatMap and map combinators.

import scala.util.{Failure, Success, Try}

  val withTry = Try("1".toInt) // Success(1)
  withTry match {
    case Success(value) => println(value)
    case Failure(ex) =>
      //Code here for handle an exception

  val tryWithRecover = Try("Non-Numeric-Value".toInt) match {
    case Success(value) => println(value)
    case Failure(ex) => println(ex)

  //Try's map,flatMap,fold etc
  def inc(n: Int): Int = n + 1

  val try1 = Try("abc".toInt)
  val tResult = try1.map(f => inc(f))// The function `inc` will execute when `Try("abc".toInt)` doesn't raise an exception

Try’s recover and recoverWith: Applies the given function f if this is a Failure, otherwise returns this if this is a Success.

//Recover with value
val tryWithRecoverF = Try("Non-Numeric-Value".toInt).recover {
    //Here you pattern match on type of an exception
    case ne: NumberFormatException => 0
    case ex: Exception => 0

//Recover with an another Try
  def recoverWith(first: String, second: String): Try[Int] = {
    //The code of recoverWith function will execute when `Try(first.toInt)` raise an exception
    Try(first.toInt).recoverWith {
      case ne: NumberFormatException => Try(second.toInt)

Note: all Try combinators like map,flatMap, filter, fold, recover, recoverWith, transform, collect will catch exceptions

def compute(number: Int, divideBY: Int): Int = number / divideBY

  val t1 = Try("123".toInt).map(n => compute(n, 2)) //Success(61)
  val t2 = Try("123".toInt).map(n => compute(n, 0)) //Failure(java.lang.ArithmeticException: / by zero)
def computeWithTry(value: String): Try[Int] = Try(value.toInt)

  val r1: Try[Int] = computeWithTry("123")
    ex => println(ex),
    value => println(compute(value, 2))

    ex => println(s"Exception--${ex}"),
    value => println(compute(value, 0))
  ) // Exception--java.lang.ArithmeticException: / by zero

    ex => println(ex),
    value => println(compute(value, 2))

  computeWithTry("123").map(n => compute(n, 2)) //Success(61)
  computeWithTry("123").map(n => compute(n, 0)) //Failure(java.lang.ArithmeticException: / by zero)
  computeWithTry("abc").map(n => compute(n, 2)) //Failure(java.lang.NumberFormatException: For input string: "abc")

Note: only non-fatal exceptions are caught by the combinators on Try (see scala.util.control.NonFatal). Serious system errors, on the other hand, will be thrown.

Here you can find complete code

Stay tuned for next part 🙂

References: scala.util.Try

Methods with variable arguments

  • Scala methods can have variable arguments (vararg).
  • A method can be specified to have a variable number of arguments by adding a * after the type of the parameter.
  • As an example, let’s define a method that takes a variable number of arguments of type String and that returns their concatenation as String:
  • For obvious reasons, a method can only have one parameter that has variable arguments and it should be the last parameter.
scala> def concatStrings(s: String*): String = s.mkString
concatStrings: (s: String*)String

scala> concatStrings("a", "b", "c")
res0: String = abc

scala> def concatStringsSep(separator: String, s: String*): String =

scala> concatStringsSep("/", "a", "b", "c")
res1: String = a/b/c

you can pass sequence as variable length arguments to a function.

scala> val listOfStrings = List("first","second","third")
listOfStrings: List[String] = List(first, second, third)

scala> concatStrings(listOfStrings:_*)
res5: String = firstsecondthird

scala> concatStringsSep(",",listOfStrings:_*)
res6: String = first,second,third

Scala pure/total functions


In this blog, I am going to explain about pure/total function.

Let’s discuss what is a function?

function is a process which takes some input, called arguments, and produces some output called a return value

A pure function is a function which:
1. Given the same input, will always return the same output, called Determinism.
2. Produces no side effects.

It described how inputs relate to outputs, without spelling out the steps to get from A to B. Every function call must produce results in isolation. Pure functions are required to construct pure expressions.

The function result value cannot depend on any hidden information or state that may change while program execution proceeds or between different executions of the program, nor can it depend on any external input from I/O devices.

They are easy to parallelized. The pure functions are referentially transparent, we only need to compute their output once for given inputs. Caching and reusing the result of a computation is called memoization, and can only be done safely with pure functions.

Pure functions are also extremely independent — easy to move around, refactor, and reorganize in your code, making your programs more flexible and adaptable to future changes.

In general,

Pure function = Output depends on input + No side effects

f:X->Y, The f is a function which takes X as input and returns Y as output.
There is an expression like (f(3),f(3)), here we are calling function f two times for same input.
so f is a pure function, you can modify an expression.I have lifted out common expression f(3) into a variable because I know every time f(3) will give me the same result.
cont x = f(3)
result = (x,x)

For example, Pure functions

  • sin(x), returning the sine of a number x
  • length(s), returning the size of a string s

Pure functions in Scala,

scala>def square(a: Int) = a * a
square: (a: Int)Int

scala> def pureFunc(x : Int, y : Int) = x + y
pureFunc: (x: Int, y: Int)Int

scala> pureFunc(1,2)
res0: Int = 3
scala> (pureFunc(1,2), pureFunc(1,2))
res1:(Int,Int) =(3,3)
scala> val fr = pureFunc(1,2)
scala>(fr,fr) //fr would not evaluate again and 
res2:(Int,Int) =(3,3)

scala> def impureFunc(x: Int,y: Int) = println(x+y)
impureFunc: (x: Int, y: Int)Unit

scala> impureFunc(1,2)

scala> def anotherImpureFunc(x: Int) = if(x > 0) x
anotherImpureFunc: (x: Int)AnyVal

scala> anotherImpureFunc(2)
res2: AnyVal = 2

scala> anotherImpureFunc(-1)
res3: AnyVal = ()