Spark Learning Notes (1) - Spark Core

Play With Big Data

Posted by Fan Gong on February 2, 2018

Nowadays, ability to deal with some big data problems is a necessity to become a data scientist. So, recently I was trying to learn some big data knowledge.

The reason that I start from learning Spark is that first, it has python API and also it is much functional than Hadoop. But later on, I will get back to learn the knowledge of Hadoop ecosystem, since they have a very close relationship (As far as I know, spark always runs on cluster Through Hadoop YARN).

My learning materials include the online course Taming Big Data with Apache Spark and Python and the official Spark documentation from spark website. I think this documentation is pretty good compared with some other obscure and elusive official documentation I have read such as Tensorflow's. :D

Without further ado, let me summarize what I have learned about spark core.

1. Overview

To tell what is Spark, I would like to quote this sentence from the documentation:

Every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster.

To literally understand this sentence, spark uses parallel operation to efficiently solve our problems; At the same time, we programmer write the main function from various API, configure the cluster to let spark successfully operate on it.

It seems like very easy, but the partitions of tasks or the parallel operations could go wrong easily. So we need a special data structure especially for parallel calculation. Here comes resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. Besides, we may need shared variables that can be used in parallel operations. Next, let us have some more detailed idea about spark.

2. Resilient Distributed Dataset (RDD)

RDD is one of the most important abstractions of Spark, it is a fault-tolerant collection of elements that can be operated on in parallel. In this chapter I will focus on three parts of RDD:

  • How to create RDD
  • What is the RDD operation
  • The storage of RDD

2.1 Create RDD

There are two ways to generate RDD:

  1. Parallelized Collections: The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.
  2. External Datasets: Pyspark can create distributed datasets from any storage source supported by Hadoop, local file system and so on. The most commonly used method is sc.textFile() method.

Here is a simple example:

from pyspark import SparkContext, SparkConf
import numpy as np

conf = SparkConf().setMaster('local').setAppName('try') # we will use local cpu as the master nodes
sc = SparkContext(conf = conf)

# Create an RDD from Parallelizing
lst = np.array([[1,1],[2,2]])
rdd1 = sc.parallelize(lst)

# Create an RDD from the local text file
rdd2 = sc.textFile('Book.txt')

2.2 RDD Operations

RDD supports two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

The transformations are only computed when an action is performed. The design is called Lazy Calculation, which enables Spark to run more efficiently.

I will not list the specific transformation and action function here. Please refer to the official documentation. What I want to do here is to illustrate how to use RDD operations by examples.

As far as what I have learned, for RDD (not talking about DataFrame and Datasets), it is very common to transform it to key-value pairs first, and then Use Lambda or function to RDD Operations since RDD operations always need us to pass a function in it. So, in python, it is very convenient to use lambda to create a simple function; Also, we could define our own function for dealing with complex RDD.

Suppose our target is to find the word that is used most frequently in one book. Here is what we could do in PySpark:

from pyspark import SparkConf, SparkContext
import re

conf = SparkConf().setMaster('local').setAppName('Mytry2') # run locally
sc = SparkContext(conf = conf)

# Construct our own function
def delete_punc(text):
    find_punc = re.compile(r'\W+', re.UNICODE) # matches any non-word character
    return(find_punc.split(text.lower())) # re.split will split by this non-word character and then delete it

# Create RDD by using external files
rdd = sc.textFile('./Book.txt')
# Use our own function
rdd_each_word = rdd.flatMap(delete_punc)
# Use lambda function to create key-pairs
rdd_count = rdd_each_word.map(lambda x: (x,1)).reduceByKey(lambda x,y: x + y)
# Sort the results
rdd_count_sort = rdd_count.map(lambda x: (x[1],x[0])).sortByKey().collect()

for i in rdd_count_sort:
    count = str(i[0])
    words = i[1].encode('ascii', 'ignore')
    print(words.decode() + ':\t ' + count)

## results:
##
##  your:    1420
##  to:      1828
##  you:     1878
##

The example illustrates how we use our own function, lambda function, and key-value pairs to solve a counting problem. The amazing part is that we could use abundant Python module in RDD's operation.

2.3 RDD Storage

Because of the design of lazy calculation, each transformed RDD may be recomputed each time you run an action on it. So we could use persist or cache method to store the RDD in memory. The difference is that with cache(), you use only the default storage level MEMORY_ONLY. With persist(), you can specify which storage level you want.

Here is an example:

# Same boilerplate as the above example
rdd_count.persist(StorageLevel(True, False, False, False, 1)) # DISK_ONLY
rdd_count.unpersist().is_cached # --> True

3. Shared Variables

We know Spark work on parallel calculation, which means we will have several executors working together. So, the general read-write shared variables across tasks would be inefficient. So, Spark provides two limited types of shared variables: broadcast variables and accumulators.

3.1 Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. It is useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Here is an example that I will use sc.broadcast(): We have two datasets, one contains the movie id and its rank by customers. Another dataset just contains the matching movie name for each movie id. In that case, if we care about the movie name, we could put the name dataset as a broadcast variable, like this:

# Function to clean movie name data
def loadnames():
    movieNames = {}
    with open('ml-100k/u.item', encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames
# Construct the broadcast variable
nameDict = sc.broadcst(loadnames())
    ....
    ....
# ignore some middle steps.
# Remember to use .value to extract the information of this broadcast variable
result_names = result_sorted.map(lambda x: (nameDict.value[x[1]],x[0])).collect()

3.2 Accumulators

Accumulators are variables that are used for aggregating information across the executors. For example, this information can pertain to data or API diagnosis like how many records are corrupted or how many times a particular library API was called.

Basically, we create a accumulator object first accum = sc.accumulator(0) and use accm.add(n) to add n to our accumulator. Also, we could use accum.value to see the value of our accumulator.

One thing needs to keep in mind is that due to the lazy calculation, unless an action happens on an RDD, the transformations are not executed. As a result of this, accumulators used inside functions likemap() or filter() won't get executed unless some actions happen on the RDD.


All right. I think I have covered the basic Core Spark knowledge. Let me talk about the Spark SQL next time.