Introduction to Apache SparkSQL

6 October 2017

Spark SQL

Spark SQL or previously known as Shark (SQL on Spark)is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

Spark SQL Uses

One use case of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.

Spark DataSets

A Dataset is a distributed collection of data which can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

A DataFrame is a DataSet of Row objects (DataSet[Row])

  • DataSets can wrap a given struct or type (DataSet[Person], DataSet[(String, Double)])
  • DataFrames schema is inferred at runtime but a DateSet can be inferred at compile time. Faster detection of errors means better Optimization
  • RDD’s can be converted to DataSets with .toDS()
  • DataSets are more efficient as they can be serialized very efficiently -even better than Kyro
  • Optimal execution plans can be determined at compile time
  • DateSets allow for better interoperability.In fact MLLib and Spark Streaming are moving toward using DataSets instead of RDD’s for their primary API
  • DataSets simplifies development as SQL operations can be performed on datasets with one line

Spark DataFrame

A DataFrame is a Dataset organized into named columns. It is Spark SQL’s primary data abstraction which represents a distributed collection of rows organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

DataFrame is a class defined in the Spark SQL library. It provides various methods for processing and analyzing structured data. For example, it provides methods for selecting columns, filtering rows, aggregating columns, joining tables, sampling data, and other common data processing tasks. Unlike RDD, DataFrame is schema aware. An RDD is a partitioned collection of opaque elements, whereas a DataFrame knows the names and types of the columns in a dataset. As a result, the DataFrame class is able to provide a rich domain-specific-language (DSL) for data processing. The DataFrame API is easier to understand and use than the RDD API. However, if required, a DataFrame can also be operated on as an RDD. An RDD can be easily created from a DataFrame. Thus, the complete RDD interface is also available to process data represented by a DataFrame. A DataFrame can be registered as a temporary table, which can be queried with SQL or HiveQL. A temporary table is available only while the application that registered it is running.

Create Spark Session

In Spark 2.0.0 or greater we can create a SparkSession object instead of a SparkContext when using Spark SQL /DataSets.

we use SparkContext from this SparkSession and use it to issue SQL queries on the Data Sets

Entry point to all the functionality in spark SQL is through Spark session class

import org.apache.spark.sql.sparksession

val spark = SparkSession 
   .builder()
   .appName("SPark SQL Session")
   .config("spark.config.option","config-value")
   .getOrCreate()
   
   // For Implicit Conversiosn from RDDs to Data Frames
   
   import spark.implicits._
   

val df = spark.read.json("file.json") //Create Data Frames from Json
//Display the Content 
sf.show()

Extract Data from Spark SQL

val sqlDataFrame = spark.sql("SELECT * FROM people")
sqlDataFrame.show()  //Display the Data Frame

Saving Data to Peristant Tables in Spark Sql

DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.

If the data source is based upon a file-based like text,parquet ,json ,csv etc we can specify a custom table path For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the path option, e.g. df.write.option("path", "/some/path").saveAsTable("t"). When the table is dropped, the custom table path will not be removed and the table data is still there.

If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

Partition Discovery in Spark SQL

Table partition is an optimization approach in which data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory.The Parquet data source is now able to discover and infer partitioning information automatically.

When writing the Data in Hive Table , .partitionBy is used to Partition the Data

<Data Frame>
.write
.partitionBy("partition")

Caching Data In Memory in Spark SQL

Table can be cached using an in-memory columnar format by calling spark.catalog.cacheTable("table-name") or sparkDataFRame.cache() function .

To remove the table from in-memory we use spark.catalog.uncacheTable("table-name") .

Data Modelling in Spark

Data modeling is a critical step in Spark SQL when we need to extract data from databases like Cassandra or any other SQL databases .

Catalyst Optimizer in Spark

Catalyst optimizer primarily leverages functional programming constructs of Scala such as pattern matching. It offers a general framework for transforming trees or lineage graph in Spark, which we use to perform analysis, optimization, planning, and runtime code generation.

Catalyst optimizer has two primary goals:

  • Make adding new optimization techniques easy

  • Enable external developers to extend the optimizer

Catalyst’s transformation phases in Spark SQL

1. Analyzing a logical plan to resolve references

2. Logical plan optimization

3. Physical planning

4. Code generation to compile the parts of the query to Java bytecode

Spark SQL optimization

** 1.spark.sql.codegen**

  • It asks the Spark SQL to compile each query to Java byte code before executing it

** 2. spark.sql.inMemoryColumnarStorage.batchSize**

References

[1] Catalyst Optimizer in Spark SQL

[2] Spark SQL Guide

Share: Twitter Facebook Google+ LinkedIn
comments powered by Disqus