Apache Spark

Overview

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs

Versions 1.1 (with Hadoop 2.4), 1.6 (with Hadoop 2.6) and 2.3.1 (with Hadoop 2.7) are installed on the CSF (we recommend using the latest available version).

Restrictions on use

There are no restrictions on accessing Spark on the CSF. It is distributed under the terms of the Apache License v2.0.

Set up procedure

We now recommend loading modulefiles within your jobscript so that you have a full record of how the job was run. See the example jobscript below for how to do this. Alternatively, you may load modulefiles on the login node and let the job inherit these settings.

Load one of the following modulefiles:

module load apps/binapps/apache-spark/2.4.3
module load apps/binapps/apache-spark/2.3.1
module load apps/binapps/apache-spark/1.6.2
module load apps/binapps/apache-spark/1.6.0
module load apps/binapps/apache-spark/1.1.0

Running the application

Please do not run spark on the login node. Jobs should be submitted to the compute nodes via batch.

On the CSF Spark should be run in standalone cluster mode. This means that you submit a batch job to the CSF batch system in the usual way (with a jobscript) to reserve a number of cores, which will give you access to one or more compute nodes. You then start up a temporary Spark cluster on those compute nodes and run your Spark application within that temporary Spark cluster. When your Spark application finishes the Spark cluster is shut down and the batch job ends.

Hence no Spark processes (e.g., the Spark cluster managers) are left running – the Spark standalone cluster only exists for the duration of your batch job in the CSF batch system.

Parallel batch job submission

Make sure you have the modulefile loaded then create a batch submission script, for example:

#!/bin/bash --login
#$ -cwd             # Job will run from the current directory

# Change to the required version
module load apps/binapps/apache-spark/1.6.2

#### CSF - Optionally specify whether you want a 256GB RAM node or a 512GB RAM node ####

#$ -l mem256

## or

#$ -l mem512

#$ -pe smp.pe 16         # Single compute node only (can be 2-16) if using mem256 or mem512.

######### Multi-node: Use only *ONE* of the following Parallel Environments (delete unused line) ########

#$ -pe smp.pe 24         # Single node where the number of cores can be 2-24

## or

#$ -pe mpi-24-ib.pe 48   # Multi-node using entire compute nodes (24 cores per node).
                         # The number of cores MUST be a multiple of 16 and a minimum of 48.
                         # In this example we use two compute nodes (2 x 24 = 48 cores)

# Generate a config file and temporary directories for the spark cluster
# See below for a description of what this does.
spark-csf-config.sh
   #
   # The default will create 1 worker and 1 executor per node.
   # You can add additional variables by adding '-e VAR=value' flags to the above line.
   # For example: if you wish to run 2 workers per slave node, 8 cores per worker and
   # give just under 50% of a 256GB node's memory to each worker use:
   #
   # spark-hydra-config.sh -e SPARK_WORKER_INSTANCES=2 -e SPARK_WORKER_CORES=8 -e SPARK_WORKER_MEMORY=127G

# Inform Spark where the config file is
export SPARK_CONF_DIR=`spark-csf-config.sh -d`

# Set up the environment (note that is a . then a space at the start of the line)
. ${SPARK_CONF_DIR}/spark-env.sh

# Start the Spark cluster
start-all.sh

# Submit our spark app to the spark cluster.
# Note we set executor-memory to 32GB (each executor can use 32GB)
# EDIT THIS LINE
spark-submit --master=$MASTER_URL --verbose --executor-memory 32G myapp.py arg1 arg2

# Stop the Spark cluster
stop-all.sh

# OPTIONAL: Cleanup any temporary worker directories in our scratch area
spark-csf-config.sh -cw

Note that you must include all of the lines unless indicated that they are optional. You will edit the line where you submit your application to the spark cluster. But the other lines that set up the cluster and start/stop the processes must be present. Your spark job will not run correctly otherwise.

The spark-csf-config.sh script at the start of your jobscript does the following:

  1. Creates a config directory (named spark.JOBID) in your job’s current working directory to hold a Spark config file (named spark-env.sh) and a subdir (named logs) for log files. JOBID is the unique number assigned to your batch job when you run qsub.
  2. Creates a file named spark-slaves in the above directory containing the names of the Hydra compute nodes to be used for worker processes. Spark will automatically read this file and start the worker processes.
  3. Creates a temporary directory named spark-temp.JOBID in your scratch area for worker temporary files. This can usually be deleted at the end of a job.
  4. The spark-env.sh file written in the config directory sets the following environment variables:
    • SPARK_CONF_DIR to the above spark.JOBID directory.
    • SPARK_MASTER_IP to the name of the first hydra compute node assigned to your job (v1.6.x).
    • SPARK_MASTER_HOST to the name of the first hydra compute node assigned to your job (v2.3.x).
    • MASTER_URL to spark://${SPARK_MASTER_IP}:7077
    • SPARK_SLAVES to a file containing the hostnames of the slave nodes.
    • SPARK_WORKER_CORES to the number of cores per compute node based on the total number of cores you request in the jobscript.
    • SPARK_WORKER_DIR to the temporary scratch directory ~/scratch/spark-temp.JOBID
    • SPARK_LOG_DIR to a directory inside the temp conf dir (see above)

The above environment file is used by the Spark master and slave processes to ensure log files and temporary files are written to appropriate locations in your storage areas.

You should now submit the jobscript to the batch system using:

qsub scriptname

where scriptname is the name of your jobscript.

When the job runs, check the scriptname.oJOBID and scriptname.eJOB files for messages from spark. You can ignore any warnings about X11 authority if present. You will have a temporary directory in your current directory named spark.JOBID which contains more log files in a subdirectory named logs and the spark-slaves file (in case you are interested to see where your spark cluster is running).

Memory Requirements

The memory settings in Apache Spark are somewhat complicated. One of our users has offered the following advice:

  • The default settings are as follows:

    The worker gets:

    • all the available memory minus 1G
    • the number of cores available (16 in the CSF case)

    The executor gets:

    • 1GB of memory
    • all of the cores available on the worker

    By default, one executor is created per worker, and the settings are taken by node, this is, all of the nodes will have 1 worker and 1 executor if you leave it on its defaults, using:

    spark-hydra-config.sh
    

    Alternatively, the number of workers can be changed as shown in the submission script:

    spark-hydra-config.sh -e SPARK_WORKER_INSTANCES=2 -e SPARK_WORKER_CORES=8 -e SPARK_WORKER_MEMORY=127G
    

    The number of executors can be changed by dividing the resources that the worker has. For example, to have two executors from a worker with ~500GB of memory and 16 cores, the submission script should be:

    # Generate a spark config file for the compute nodes our job is running on
    spark-hydra-config.sh
    
    # Set up the environment (note that is a . then a space at the start of the line)
    . ${SPARK_CONF_DIR}/spark-env.sh
     
    # This will make the application to use 64GB and 16 cores
    spark-submit --master=$MASTER_URL --verbose --executor-memory 32G --executor-cores 8 myapp.py arg1 arg2
    

    The maximum number of executors per node that you could have is 16, each with 32GB and 1 core, considering a maximum of 16 cores. Technically we could even set 1GB per executor and have 512 executors with 1 core each (cores=threads, but the ideal is to keep the number equal to cpu cores) but, that is likely not the best thing to do.

    Things to have in mind are that you cannot set the executor memory to have more memory than is available in the worker, if you do that the executor will not start. The executor will also fail if you set the worker to have more memory than the real memory. For example, on the CSF 512GB nodes, if you set the worker memory to be of 800GB, and the executor to have 800GB, it will fail on the executor, the worker will not trigger any failure by itself with regards to the memory.

Further info

Updates

None.

Last modified on August 2, 2019 at 10:12 am by Mark Lundie