Tag Archives: Spark

Spark: Why should we not use inferSchema = true with Dataframe?

In this blog post, I have explained why we should not use inferSchema = true. It means we are telling spark to infer schema automatically.

The schema means here are the column types. A column can be of type String, Double, Long, etc.

If the schema is not specified using schema function and inferSchema option is enabled, this function goes through the input once to determine the input schema. It means it takes some time to infer a schema.

If the schema is not specified using schema function and inferSchema option is disabled then it determines the columns as string types and it reads only the first line to determine the names and the number of fields.

In the below examples, I have explained how much time it takes to infer a schema and with the same action.

time function calculate how much time a block of code takes to execute.


def time[A](name: String)(body: => A) = {
    val start = System.currentTimeMillis()
    body
    val end = System.currentTimeMillis()
    println(s"$name Took ${end - start} millis")
  }

src/main/resources/engineer.csv

name,department,years_of_experience,dob
Abdhesh,Software Engineer,8,1990-07-20
Shikha,Fullstak Developer,9,1989-07-02

1- Using inferSchema = true :

case class Developer(
      name: String,
      department: String,
      years_of_experience: Int,
      dob: Timestamp
  )
  time("inferSchema = true") {
    val developerDF = spark.read
      .option("inferSchema", "true")
      .option("header", "true")
      .option("timestampFormat", "yyyy-MM-dd")
      .csv("src/main/resources/engineer.csv")
    import spark.implicits._
    val developerDS = developerDF.as[Developer]
    developerDS.collect().toList.foreach(println)
  }
Output:
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
inferSchema = true Took 18040 millis

2- Using explicit schema: (inferSchema = false)

time("inferSchema = false") {
    val schema = StructType(
      List(
        StructField("name", StringType, false),
        StructField("department", StringType, false),
        StructField("years_of_experience", IntegerType, false),
        StructField("dob", TimestampType, false)
      )
    )
    val developerDF = spark.read
      .option("header", "true")
      .schema(schema)
      .csv("src/main/resources/engineer.csv")
    import spark.implicits._
    val developerDS = developerDF.as[Developer]
    developerDS.collect().toList.foreach(println)
  }
Output:
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
inferSchema = false Took 718 millis

If you don’t want to define schema explicit then you can derive schema from an encoder.

time("inferSchema = false, derive schema from an encoder") {
    implicit val encoderDeveloper = Encoders.product[Developer]
    val developerDF = spark.read
      .option("header", "true")
      .schema(encoderDeveloper.schema)
      .csv("src/main/resources/engineer.csv")
    val developerDS = developerDF.as[Developer]
    developerDS.collect().toList.foreach(println)
  }
Outout:
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
inferSchema = false, derive schema from an encoder Took 388 millis

3- Infer schema dynamically

Dynamically we can infer the schema from the first row of the CSV(after the header row) and set while reading full CSV. It is the best trick to get schema dynamically if you don’t know the schema of CSV. 

 time("Infer schema from first row") {
    val developerDF1RowSchema = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("timestampFormat", "yyyy-MM-dd")
      .csv("src/main/resources/engineer.csv")
      .head()
      .schema

    val developerDF = spark.read
      .option("header", "true")
      .option("timestampFormat", "yyyy-MM-dd")
      .schema(developerDF1RowSchema)
      .csv("src/main/resources/engineer.csv")

    import spark.implicits._
    val developerDS = developerDF.as[Developer]
    developerDS.collect().toList.foreach(println)
  }
Output:
Developer(Abdhesh,Software Engineer,8,1990-07-20 00:00:00.0)
Developer(Shikha,Fullstak Developer,9,1989-07-02 00:00:00.0)
Infer schema from first row Took 3570 millis

Now If you compare between approach 1st and 2nd, the processing time is dropped ~97%. I just have two records in my CSV file. Think about, if you have a huge CSV file then you could get better performance by defining schema explicitly. So you should never ever use inferSchema = true. If you want, you can get a code from my Github repository.

Happy coding 🙂

An Easy and fast way of installing Java, Scala and Spark

1. Download and Install Java 8


abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://download.oracle.com/otn-pub/java/jdk/8u151-b12/e758a0de34e24606bca991d704f6dcbf/jdk-8u151-linux-x64.tar.gz

Extract tar file:


abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf jdk-8u151-linux-x64.tar.gz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
jdk-8u151-linux-x64  jdk-8u151-linux-x64.tar.gz

Set environment path variable for Java


abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.


export JAVA=/home/abdhesh/Documents/Applications/jdk-8u151-linux-x64
export PATH=$JAVA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session


abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run java version command:


abdhesh@abdhesh-latitude:~/Documents/Applications$ java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

 2. Download and Install Scala


abdhesh@abdhesh-latitude:~/Documents/Applications$ wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.tgz

Extract tar file:


abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf scala-2.12.4.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
scala-2.12.4  scala-2.12.4.tgz

Set environment path variable for scala


abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc
export SCALA=/home/abdhesh/Documents/Applications/scala-2.12.4
export PATH=$JAVA/bin:$SCALA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session


abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run scala version command:


abdhesh@abdhesh-latitude:~/Documents/Applications$ scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

3. Download and Install Apache Spark


abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://apache.mirror.anlx.net/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

Extract tar file:


abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf spark-2.2.1-bin-hadoop2.7.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
spark-2.2.1-bin-hadoop2.7  spark-2.2.1-bin-hadoop2.7.tgz

Set environment path variable for spark


abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.


export SPARK=/home/abdhesh/Documents/Applications/spark-2.2.1-bin-hadoop2.7
export PATH=$JAVA/bin:$SCALA/bin:$SPARK/bin:$PATH

Now reload a .bashrc file on same terminal’s session


abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run Spark shell:

Here is link of people.json file


abdhesh@abdhesh-latitude:~/Documents/Applications$ spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/28 01:02:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/28 01:02:17 WARN Utils: Your hostname, abdhesh-latitude resolves to a loopback address: 127.0.1.1; using 192.168.0.16 instead (on interface wlp2s0)
17/12/28 01:02:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at http://192.168.0.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1514422939241).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 
scala> val df  = spark.read.json("spark-2.2.1-bin-hadoop2.7/data/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.filter("age >= 19").select("name","age").show()
+------+---+
|  name|age|
+------+---+
|  Andy| 30|
|Justin| 19|
+------+---+


scala> //using Sql

scala> df.createOrReplaceTempView("people")

scala> spark.sql("SELECT * FROM people WHERE age >=19").show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+


scala> 

Stay tuned for next blog post 🙂