6 October 2017
Spark SQL or previously known as Shark (SQL on Spark)is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.
Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
One use case of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.
A Dataset is a distributed collection of data which can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).
A DataFrame is a DataSet of Row objects (DataSet[Row])
A DataFrame is a Dataset organized into named columns. It is Spark SQL’s primary data abstraction which represents a distributed collection of rows organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
DataFrame is a class defined in the Spark SQL library. It provides various methods for processing and analyzing structured data. For example, it provides methods for selecting columns, filtering rows, aggregating columns, joining tables, sampling data, and other common data processing tasks. Unlike RDD, DataFrame is schema aware. An RDD is a partitioned collection of opaque elements, whereas a DataFrame knows the names and types of the columns in a dataset. As a result, the DataFrame class is able to provide a rich domain-specific-language (DSL) for data processing. The DataFrame API is easier to understand and use than the RDD API. However, if required, a DataFrame can also be operated on as an RDD. An RDD can be easily created from a DataFrame. Thus, the complete RDD interface is also available to process data represented by a DataFrame. A DataFrame can be registered as a temporary table, which can be queried with SQL or HiveQL. A temporary table is available only while the application that registered it is running.
Spark 2.0.0 or greater we can create a SparkSession object instead of a SparkContext when using Spark SQL /DataSets.
SparkContext from this
SparkSession and use it to issue SQL queries on the Data Sets
Entry point to all the functionality in spark SQL is through Spark session class
import org.apache.spark.sql.sparksession val spark = SparkSession .builder() .appName("SPark SQL Session") .config("spark.config.option","config-value") .getOrCreate() // For Implicit Conversiosn from RDDs to Data Frames import spark.implicits._ val df = spark.read.json("file.json") //Create Data Frames from Json //Display the Content sf.show()
val sqlDataFrame = spark.sql("SELECT * FROM people") sqlDataFrame.show() //Display the Data Frame
DataFrames can also be saved as persistent tables into Hive metastore using the
saveAsTable command. Unlike the
saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.
If the data source is based upon a file-based like text,parquet ,json ,csv etc we can specify a custom table path
For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the path option, e.g.
df.write.option("path", "/some/path").saveAsTable("t"). When the table is dropped, the custom table path will not be removed and the table data is still there.
If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.
Table partition is an optimization approach in which data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory.The Parquet data source is now able to discover and infer partitioning information automatically.
When writing the Data in Hive Table ,
is used to Partition the Data
<Data Frame> .write .partitionBy("partition")
Table can be cached using an in-memory columnar format by
sparkDataFRame.cache() function .
To remove the table from in-memory we use
Data modeling is a critical step in Spark SQL when we need to extract data from databases like Cassandra or any other SQL databases .
Catalyst optimizer primarily leverages functional programming constructs of Scala such as pattern matching. It offers a general framework for transforming trees or lineage graph in Spark, which we use to perform analysis, optimization, planning, and runtime code generation.
Catalyst optimizer has two primary goals:
Make adding new optimization techniques easy
Enable external developers to extend the optimizer
1. Analyzing a logical plan to resolve references
2. Logical plan optimization
3. Physical planning
4. Code generation to compile the parts of the query to Java bytecode
 Spark SQL Guide