Difference between revisions of "HowTo:spark"

From CAC Wiki
Jump to: navigation, search
(Troubleshooting)
(Template job)
 
(One intermediate revision by one other user not shown)
Line 1: Line 1:
 
Apache Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. This page is a how to guide on using Apache Spark on the SW cluster.  
 
Apache Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. This page is a how to guide on using Apache Spark on the SW cluster.  
  
Running a Spark job requires several steps:
+
Spark job scripts need to do two things:
  
* To load the Spark libraries and scripts to your path, run the command: <code>use spark</code>.
+
* To setup Spark appropriately, run <code>source /opt/gaussian/setup-spark.sh</code> (note that this also autoloads Java 8, Anaconda 2/3, and R).
 
+
* To setup scratch disks appropriately, run <code>source /opt/gaussian/setup-spark.sh</code>
+
  
 
* Run your application in Spark using <code>spark-submit</code>. Make sure to set proper values for executor and driver memory or you will likely experience memory-related errors! <code>--master local[#]</code> sets up and uses a local standalone Spark cluster with # workers for your job (using "$NSLOTS" for "#" automatically sets the number of workers to the number of cores requested by your job).  
 
* Run your application in Spark using <code>spark-submit</code>. Make sure to set proper values for executor and driver memory or you will likely experience memory-related errors! <code>--master local[#]</code> sets up and uses a local standalone Spark cluster with # workers for your job (using "$NSLOTS" for "#" automatically sets the number of workers to the number of cores requested by your job).  
Line 11: Line 9:
 
Note that HDFS is not installed on the SW cluster. To use the cluster filesystem instead, use <code>file:///path/to/file</code> instead of <code>hdfs://host:8020/path/to/file</code> in your scripts.
 
Note that HDFS is not installed on the SW cluster. To use the cluster filesystem instead, use <code>file:///path/to/file</code> instead of <code>hdfs://host:8020/path/to/file</code> in your scripts.
  
= Template job =  
+
= Template job =
  
 
The following template script (found at /opt/global/spark/spark-job.sh) will work as a good job template for a typical Spark job. You must edit this script to reflect the proper number of cores and memory you will use (the "-pe glinux.pe #" and "-l mf=#G" lines), or your job may be scheduled with sub-optimal resources.  
 
The following template script (found at /opt/global/spark/spark-job.sh) will work as a good job template for a typical Spark job. You must edit this script to reflect the proper number of cores and memory you will use (the "-pe glinux.pe #" and "-l mf=#G" lines), or your job may be scheduled with sub-optimal resources.  
Line 24: Line 22:
 
#$ -cwd
 
#$ -cwd
 
#$ -V
 
#$ -V
#$ -q abaqus.q
 
#$ -l qname=abaqus.q
 
  
 
# This line should be equal to the total executor and driver memory used.
 
# This line should be equal to the total executor and driver memory used.
Line 69: Line 65:
 
</pre>
 
</pre>
  
== Troubleshooting ==  
+
= Troubleshooting =
  
 
=== My job is running out of memory! ===
 
=== My job is running out of memory! ===
Line 78: Line 74:
  
 
The "glinux.pe" part of the template script requests nodes with fast local scratch disks that considerably improve performance (Spark likes to cache certain information to disk... without the scratch disks, there is a >3x loss in performance). These nodes typically have 24 cpus and 256 GB memory, so your jobs may get stuck in "qw" (queue) state if they try to pick more (there are very few nodes that are larger than that AND have a scratch disk). If you need more cores or memory, you can change <code>glinux.pe</code> (requests node with a scratch disk) to <code>shm.pe</code> (any node), but be prepared for the performance hit.
 
The "glinux.pe" part of the template script requests nodes with fast local scratch disks that considerably improve performance (Spark likes to cache certain information to disk... without the scratch disks, there is a >3x loss in performance). These nodes typically have 24 cpus and 256 GB memory, so your jobs may get stuck in "qw" (queue) state if they try to pick more (there are very few nodes that are larger than that AND have a scratch disk). If you need more cores or memory, you can change <code>glinux.pe</code> (requests node with a scratch disk) to <code>shm.pe</code> (any node), but be prepared for the performance hit.
 +
 +
=== My job is stuck in "Rr" or "Rq" states! ===
 +
 +
The scheduler will auto-reject your job unless it contains the <code>source /opt/gaussian/setup-spark.sh</code> line.

Latest revision as of 17:59, 16 May 2017

Apache Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. This page is a how to guide on using Apache Spark on the SW cluster.

Spark job scripts need to do two things:

  • To setup Spark appropriately, run source /opt/gaussian/setup-spark.sh (note that this also autoloads Java 8, Anaconda 2/3, and R).
  • Run your application in Spark using spark-submit. Make sure to set proper values for executor and driver memory or you will likely experience memory-related errors! --master local[#] sets up and uses a local standalone Spark cluster with # workers for your job (using "$NSLOTS" for "#" automatically sets the number of workers to the number of cores requested by your job).

Note that HDFS is not installed on the SW cluster. To use the cluster filesystem instead, use file:///path/to/file instead of hdfs://host:8020/path/to/file in your scripts.

Template job

The following template script (found at /opt/global/spark/spark-job.sh) will work as a good job template for a typical Spark job. You must edit this script to reflect the proper number of cores and memory you will use (the "-pe glinux.pe #" and "-l mf=#G" lines), or your job may be scheduled with sub-optimal resources.

This particular job will start a Spark application on a node with 55GB of memory, 4 cpus, and local scratch disks for improved I/O performance. STDOUT and STDERR will be written to nameOfJob.oJob# and nameOfJob.eJob#, respectively.

#!/bin/bash
#$ -S /bin/bash
#$ -o $JOB_NAME.o$JOB_ID
#$ -e $JOB_NAME.e$JOB_ID
#$ -cwd
#$ -V

# This line should be equal to the total executor and driver memory used.
#$ -l mf=55G

# Number of worker cores you want to use
#$ -pe glinux.pe 4

# Setup your environment to use Spark
source /opt/gaussian/setup-spark.sh

# Edit this line to change how your job is run:
spark-submit --master local[$NSLOTS] --executor-memory 50G --driver-memory 5G example-spark-application.py

Using Python 3 instead of Python 2

You can use Python 3 instead of Python 2 by setting the environment variable PYSPARK_PYTHON.

hpc####@swlogin1$ use spark
hpc####@swlogin1$ use anaconda3
hpc####@swlogin1$ export PYSPARK_PYTHON=python3
hpc####@swlogin1$ pyspark
Python 3.4.5 |Anaconda 2.3.0 (64-bit)| (default, Jul  2 2016, 17:47:47) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/01/13 15:56:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.4.5 (default, Jul  2 2016 17:47:47)
SparkSession available as 'spark'.
>>> 

Troubleshooting

My job is running out of memory!

Increase --executor-memory and --driver-memory accordingly. Make sure to increase the #$ -l mf=#G as well!

My job is stuck in "qw" state!

The "glinux.pe" part of the template script requests nodes with fast local scratch disks that considerably improve performance (Spark likes to cache certain information to disk... without the scratch disks, there is a >3x loss in performance). These nodes typically have 24 cpus and 256 GB memory, so your jobs may get stuck in "qw" (queue) state if they try to pick more (there are very few nodes that are larger than that AND have a scratch disk). If you need more cores or memory, you can change glinux.pe (requests node with a scratch disk) to shm.pe (any node), but be prepared for the performance hit.

My job is stuck in "Rr" or "Rq" states!

The scheduler will auto-reject your job unless it contains the source /opt/gaussian/setup-spark.sh line.