Introduction
Create the worker node.
az vm create \
--resource-group Py4SAS \
--name Spark-node-vm \
--image OpenLogic:CentOS:7.6:latest \
--size Standard_DS2_v2 \
--generate-ssh-keys \
--data-disk-sizes-gb 256
Returns.
{
"fqdns": "",
"id": "/subscriptions/e452751c-5818-4258-993b-247904c15f97/resourceGroups/Py4SAS/providers/Microsoft.Compute/virtualMachines/Spark-node-vm",
"location": "eastus",
"macAddress": "00-0D-3A-53-F0-4E",
"powerState": "VM running",
"privateIpAddress": "10.0.0.5",
"publicIpAddress": "40.XX.XXX.XXX",
"resourceGroup": "Py4SAS",
"zones": ""
Attach additional disk to Spark-node-vm.
az vm disk attach \
--resource-group Py4SAS \
--vm-name Spark-node-vm \
--name Py4SAS-disk \
--size-gb 256 \
--sku Premium_LRS \
--new
Confirm Java
ssh to Master node confirm Java version.
java -version
Returns.
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)
ssh to slave node confirm Java version.
java -version
Returns.
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)
Prepare master node
On Master update /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
40.xx.xx.xxx Spark-node-vm
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
Generate ssh key-pair on the master node.
ssh-keygen -t rsa -P ""
Returns.
Generating public/private rsa key pair.
Enter file in which to save the key (/home/trb/.ssh/id_rsa):
Your identification has been saved in /home/trb/.ssh/id_rsa.
Your public key has been saved in /home/trb/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:twHxsmz/hCNbRRo/BxFHPYdUP5UA6h4n6t9rWsj5Z64 trb@Py4SAS-vm
The key's randomart image is:
+---[RSA 2048]----+
| . .+==++|
| o. o.o=|
| o.o o .+|
| ..+ = . .|
| S++.+ . |
| .+o*= o |
| ..=*.. |
| . ++= o |
| .oooEB. |
+----[SHA256]-----+
Append the contents of $HOME/.ssh/id_rsa.pub
to $HOME/.ssh/authorized_keys
of both master as well as worker nodes.
cat $HOME/.ssh/id_rsa.pub
Returns.
ssh-rsa AAAAB3NzaC1. . . .
Install Spark on the master node.
cd /tmp
curl -O https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
Returns.
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 194M 100 194M 0 0 64.9M 0 0:00:02 0:00:02 --:--:-- 64.8M
Unpack the tar ball and move the files to /opt
.
tar zxvf spark-2.2.0-bin-hadoop2.7.tgz
mv spark-2.2.0-bin-hadoop2.7 /opt
Add environment variables to $HOME/.bashrc
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el7_6.x86_64
export SPARK_HOME=/opt/spark-2.2.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
Make the environment variables knows to the current session.
source ~/.bashrc
Prepare the spark-env.sh
file.
cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
Add the following configuration items to $SPARK_HOME/conf/spark-env.sh
.
export JAVA_HOME=/opt/jdk1.8.0_131/
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=1800M
export SPARK_WORKER_INSTANCES=2
Prepare the slaves
file.
cd $SPARK_HOME/conf
cp slaves.template slaves
Adding the hostname of the slave node to the slaves
file.
Spark-node-vm
Create configured tar ball setup.
cd /opt
tar czf spark.tar.gz spark-2.2.0-bin-hadoop2.7
Prepare the slave node
Copy the configured tar ball from the master to the slave node.
scp spark.tar.gz Spark-node-vm:/tmp
ssh to the slave node and unpack the configured tar ball.
cd /tmp
tar xzvf spark.tar.gz
Initialize Spark cluster
From the master node start the Spark cluster.
$SPARK_HOME/sbin/start-all.sh
Returns.
starting org.apache.spark.deploy.master.Master, logging to /opt/spark-2.2.0-bin-hadoop2.7/logs/spark-trb-org.apache.spark.deploy.master.Master-1-Py4SAS-vm.out
Spark-node-vm: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.2.0-bin-hadoop2.7/logs/spark-trb-org.apache.spark.deploy.worker.Worker-1-Spark-node-vm.out
Spark-node-vm: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-2.2.0-bin-hadoop2.7/logs/spark-trb-org.apache.spark.deploy.worker.Worker-2-Spark-node-vm.out
PySpark
Add the bin directory for Spark to the $PATH environment variable in the ~/.bashrc file.
export PATH=$PATH:/opt/spark-2.2.0-bin-hadoop2.7/bin
source ~/.bashrc
Open port 7077.
az vm open-port --priority 1010 --resource-group Py4SAS --name Py4SAS-vm --port 7077
Test out the cluster by grabbing this example PySpark code as a shake-down test. It generates 1,000,000 records with key/value pairs 100 bytes in length and sorts on the key.
spark-submit --master spark://localhost:7077 core_test.py -a
The script returns timing statistics among copious amount of logging.
19/07/02 22:34:23 INFO DAGScheduler: ResultStage 4 (count at /home/trb/pyspark/core_test.py:104) finished in 8.843 s
19/07/02 22:34:23 INFO DAGScheduler: Job 3 finished: count at /home/trb/pyspark/core_test.py:104, took 33.756283 s
PySpark Examples
Make the findspark library available to the conda environment so we can run PySpark scripts out of a Jupyter notebook.
conda install -c conda-forge findspark
Enable execution of PySpark from a Juputer notebook by adding these environment variables to ~/.bashrc
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
Source the .bashrc file.
source ~/.bashrc
Start the notebook.
jupyter notebook --no-browser --port 8888 --ip=0.0.0.0 --notebook-dir=$HOME/notebook
Initialize the Spark context.
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
sc =SparkContext()
Initialize the SQLContext and read the .csv file.
from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)
Create the df DataFrame. inferSchema = True allows the sqlContext.read.csv method to establish column types it can infer.
df = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema = True)
Return the schema for the df DataFrame.
df.printSchema()
Returns.
root
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education_num: integer (nullable = true)
|-- marital: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: integer (nullable = true)
|-- capital_loss: integer (nullable = true)
|-- hours_week: integer (nullable = true)
|-- native_country: string (nullable = true)
|-- label: string (nullable = true)
Read the .csv file to create the df_string DataFrame this time with the inferSchema = False argument and print the schema.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema = False)
df_string.printSchema()
Returns.
root
|-- age: string (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: string (nullable = true)
|-- education: string (nullable = true)
|-- education_num: string (nullable = true)
|-- marital: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: string (nullable = true)
|-- capital_loss: string (nullable = true)
|-- hours_week: string (nullable = true)
|-- native_country: string (nullable = true)
|-- label: string (nullable = true)
Write a function to convert a list of columns from type string to a new type, in this case float.
from pyspark.sql.types import *
def convertColumn(df, names, newType):
for name in names:
df = df.withColumn(name, df[name].cast(newType))
return df
# List of target columns to convert
CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
Validate the results.
df_string.printSchema()
Returns.
root
|-- age: float (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: float (nullable = true)
|-- education: string (nullable = true)
|-- education_num: float (nullable = true)
|-- marital: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: float (nullable = true)
|-- capital_loss: float (nullable = true)
|-- hours_week: float (nullable = true)
|-- native_country: string (nullable = true)
|-- label: string (nullable = true)
Fetch the first 5 rows for the age and fnlwgt columns.
df.select('age','fnlwgt').show(5)
Returns.
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows
Group by education and return a count in ascending order.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
| education|count|
+------------+-----+
| Preschool| 51|
| 1st-4th| 168|
| 5th-6th| 333|
| Doctorate| 413|
| 12th| 433|
| 9th| 514|
| Prof-school| 576|
| 7th-8th| 646|
| 10th| 933|
| Assoc-acdm| 1067|
| 11th| 1175|
| Assoc-voc| 1382|
| Masters| 1723|
| Bachelors| 5355|
|Some-college| 7291|
| HS-grad|10501|
+------------+-----+
Crosstabs for age by label.
df.crosstab('age', 'label').sort("age_label").show()
Returns.
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
| 17| 395| 0|
| 18| 550| 0|
| 19| 710| 2|
| 20| 753| 0|
| 21| 717| 3|
| 22| 752| 13|
| 23| 865| 12|
| 24| 767| 31|
| 25| 788| 53|
| 26| 722| 63|
| 27| 754| 81|
| 28| 748| 119|
| 29| 679| 134|
| 30| 690| 171|
| 31| 705| 183|
| 32| 639| 189|
| 33| 684| 191|
| 34| 643| 243|
| 35| 659| 217|
| 36| 635| 263|
+---------+-----+----+
only showing top 20 rows