Introduction

Create the worker node.

az vm create \
   --resource-group Py4SAS \
   --name Spark-node-vm \
   --image OpenLogic:CentOS:7.6:latest \
   --size Standard_DS2_v2 \
   --generate-ssh-keys \
   --data-disk-sizes-gb 256

Returns.

{
  "fqdns": "",
  "id": "/subscriptions/e452751c-5818-4258-993b-247904c15f97/resourceGroups/Py4SAS/providers/Microsoft.Compute/virtualMachines/Spark-node-vm",
  "location": "eastus",
  "macAddress": "00-0D-3A-53-F0-4E",
  "powerState": "VM running",
  "privateIpAddress": "10.0.0.5",
  "publicIpAddress": "40.XX.XXX.XXX",
  "resourceGroup": "Py4SAS",
  "zones": ""


Attach additional disk to Spark-node-vm.

az vm disk attach \
     --resource-group Py4SAS \
     --vm-name Spark-node-vm \
     --name Py4SAS-disk \
     --size-gb 256 \
     --sku Premium_LRS \
     --new


Confirm Java

ssh to Master node confirm Java version.

java -version

Returns.

openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)


ssh to slave node confirm Java version.

java -version

Returns.

openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)


Prepare master node


On Master update /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
40.xx.xx.xxx Spark-node-vm
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6


Generate ssh key-pair on the master node.

 ssh-keygen -t rsa -P ""

Returns.

Generating public/private rsa key pair.
Enter file in which to save the key (/home/trb/.ssh/id_rsa):
Your identification has been saved in /home/trb/.ssh/id_rsa.
Your public key has been saved in /home/trb/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:twHxsmz/hCNbRRo/BxFHPYdUP5UA6h4n6t9rWsj5Z64 trb@Py4SAS-vm
The key's randomart image is:
+---[RSA 2048]----+
|        .  .+==++|
|         o.  o.o=|
|        o.o o  .+|
|       ..+ = .  .|
|        S++.+ .  |
|       .+o*= o   |
|       ..=*..    |
|      .  ++= o   |
|       .oooEB.   |
+----[SHA256]-----+


Append the contents of $HOME/.ssh/id_rsa.pub to $HOME/.ssh/authorized_keys of both master as well as worker nodes.

cat $HOME/.ssh/id_rsa.pub

Returns.

ssh-rsa AAAAB3NzaC1. . . . 


Install Spark on the master node.

cd /tmp
curl -O https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz

Returns.

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  194M  100  194M    0     0  64.9M      0  0:00:02  0:00:02 --:--:-- 64.8M


Unpack the tar ball and move the files to /opt.

tar zxvf spark-2.2.0-bin-hadoop2.7.tgz
mv spark-2.2.0-bin-hadoop2.7 /opt


Add environment variables to $HOME/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el7_6.x86_64
export SPARK_HOME=/opt/spark-2.2.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin


Make the environment variables knows to the current session.

source ~/.bashrc


Prepare the spark-env.sh file.

cp $SPARK_HOME/conf/spark-env.sh.template  $SPARK_HOME/conf/spark-env.sh 


Add the following configuration items to $SPARK_HOME/conf/spark-env.sh.

export JAVA_HOME=/opt/jdk1.8.0_131/
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=1800M
export SPARK_WORKER_INSTANCES=2 


Prepare the slaves file.

cd $SPARK_HOME/conf
cp slaves.template slaves

Adding the hostname of the slave node to the slaves file.

Spark-node-vm


Create configured tar ball setup.

cd /opt
tar czf spark.tar.gz spark-2.2.0-bin-hadoop2.7


Prepare the slave node

Copy the configured tar ball from the master to the slave node.

scp spark.tar.gz Spark-node-vm:/tmp


ssh to the slave node and unpack the configured tar ball.

cd /tmp
tar xzvf spark.tar.gz


Initialize Spark cluster

From the master node start the Spark cluster.

$SPARK_HOME/sbin/start-all.sh

Returns.

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-2.2.0-bin-hadoop2.7/logs/spark-trb-org.apache.spark.deploy.master.Master-1-Py4SAS-vm.out
Spark-node-vm: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.2.0-bin-hadoop2.7/logs/spark-trb-org.apache.spark.deploy.worker.Worker-1-Spark-node-vm.out
Spark-node-vm: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.2.0-bin-hadoop2.7/logs/spark-trb-org.apache.spark.deploy.worker.Worker-2-Spark-node-vm.out


PySpark

Add the bin directory for Spark to the $PATH environment variable in the ~/.bashrc file.

export PATH=$PATH:/opt/spark-2.2.0-bin-hadoop2.7/bin
source ~/.bashrc


Open port 7077.

az vm open-port --priority 1010 --resource-group Py4SAS --name Py4SAS-vm --port 7077


Test out the cluster by grabbing this example PySpark code as a shake-down test.   It generates 1,000,000 records with key/value pairs 100 bytes in length and sorts on the key.

spark-submit --master spark://localhost:7077 core_test.py -a


The script returns timing statistics among copious amount of logging.

19/07/02 22:34:23 INFO DAGScheduler: ResultStage 4 (count at /home/trb/pyspark/core_test.py:104) finished in 8.843 s
19/07/02 22:34:23 INFO DAGScheduler: Job 3 finished: count at /home/trb/pyspark/core_test.py:104, took 33.756283 s


PySpark Examples

Make the findspark library available to the conda environment so we can run PySpark scripts out of a Jupyter notebook.

 conda install -c conda-forge findspark


Enable execution of PySpark from a Juputer notebook by adding these environment variables to ~/.bashrc

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'


Source the .bashrc file.

source ~/.bashrc


Start the notebook.

jupyter notebook --no-browser --port 8888 --ip=0.0.0.0 --notebook-dir=$HOME/notebook


Initialize the Spark context.

import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
sc =SparkContext()


Initialize the SQLContext and read the .csv file.

from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles

sc.addFile(url)
sqlContext = SQLContext(sc)


Create the df DataFrame. inferSchema = True allows the sqlContext.read.csv method to establish column types it can infer.

df = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema = True)


Return the schema for the df DataFrame.

df.printSchema()

Returns.

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)


Read the .csv file to create the df_string DataFrame this time with the inferSchema = False argument and print the schema.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema =  False)
df_string.printSchema()

Returns.

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)


Write a function to convert a list of columns from type string to a new type, in this case float.

from pyspark.sql.types import *

def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of target columns to convert
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']

# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())


Validate the results.

df_string.printSchema()

Returns.

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)


Fetch the first 5 rows for the age and fnlwgt columns.

df.select('age','fnlwgt').show(5)

Returns.

+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows


Group by education and return a count in ascending order.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+


Crosstabs for age by label.

df.crosstab('age', 'label').sort("age_label").show()

Returns.

+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows