Spark Learning Notes (2) - Spark SQL

Play With Big Data

Posted by Fan Gong on February 23, 2018

Spark SQL is a Spark module for structured data processing. One most important feature of SparkSql is that it extends RDD to DataFrame, which could:

  • Contain row objects
  • Can run SQL queries
  • Has a schema (leading to more efficient storage)
  • Read and write to JSON

Before I go deeper into Spark SQL module, Let us talk about some terms I used last time: SparkContext is the entry gate of Apache Spark functionality. It allows your Spark Application to access Spark Cluster with the help of resource manager (Spark Standalone/YARN/Mesos)
To create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext.

SPARK 2.0.0 onwards, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. All the functionality available with sparkContext are also available in sparkSession. So, in order to use APIs of SQL, HIVE, and Streaming, no need to create separate contexts as sparkSession includes all the APIs.

1. Create DataFrame

First, let us have a brief overview of what is Dataset and DataFrame:
Datasets: Dataframe full of structure data
DataFrame is a dataset of row objects. It is conceptually equivalent to a table in a relational database or a data frame in Python

In this post, I will mainly use DataFrame, since pyspark now still doesn't complete its usage of DataSet.

There are three ways to create DataFrames

  1. From existing RDD
  2. From a Hive table
  3. From Spark supported data sources

Here is an example:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.master("local").appName("try").getOrCreate()

# Create from existing RDD
rdd = spark.sparkContext.textFile('../fakefriends.csv')
rdd = rdd.map(lambda x: x.split(',')) # --> [['0','will','33','385']]
df_row = rdd.map(lambda x: Row(ID = int(x[0]), name = x[1], age = int(x[2]), numFriends = int(x[3])))
df = spark.createDataFrame(df_row)
df.show()

# Create from pandas.DataFrame
fake_dic = {'id':[1,2,3], 'age' : [2,3,3], 'class' : [1,1,2]}
fake_df = pd.DataFrame(fake_dic)
df = spark.createDataFrame(fake_df)
df.show()

# Load from data sources
df_csv = spark.read.load('../fakefriends.csv', format = 'csv')

2. DataFrame's Operations

After we have the spark DataFrame, we could then use an sql-like function to manipulate the data (very similar to the dplyr package in R).

At the same time, we could also write raw SQL queries.

Here is the example:

# Target: find the person who has the most friends with the age smaller than 20

## Use operator first
df.filter(df['age']<=20).select('name').orderBy('numFriends', ascending = False).show()

## Use SQL query
# need to register the DataFrame as a sql temp view
df.createOrReplaceTempView('people') # need to register the DataFrame as a sql temp view
sqlDF = spark.sql("SELECT name FROM people WHERE age <= 20 ORDER BY numFriends DESC")
sqlDF.show()

We can also create global temporary view if we want to have a view shared among all sessions and keep alive until the spark application terminates.

3. Parquet Files

Parquet is a columnar format, supported by many data processing systems. The advantages of having a columnar storage are as follows:

  • Columnar storage limits IO operations
  • Columnar storage can fetch specific columns that you need to access
  • Columnar storage consumes less space
  • Columnar storage gives better-summarized data and follows type-specific encoding.

Spark SQL provides support for both reading and writing parquet files that automatically capture the schema of the original data. Like JSON datasets, parquet files follow the same procedure.

Here is an example:

## write parquet file
df.write.parquet('people.parquet')

## read parquet file
p_file = spark.read.parquet('./people.parquet') #--> output a DataFrame

4. Summary

There are also some data files that I am not going to talk about here such as JSON datasets and HIVE table. The reasons are as follows:

  • In pyspark API the DataFrame is untyped so it is hard to understand the difference between Datasets and DataFrame
  • Also, the data process method is very similar to these different data formats

So I will get back to datasets later when python API is complete or when I start to learn Scala. In the next post let us see how MLlib works in pyspark.