{"id":124,"date":"2018-09-03T19:33:57","date_gmt":"2018-09-03T18:33:57","guid":{"rendered":"http:\/\/ri.itservices.manchester.ac.uk\/csf3\/?page_id=124"},"modified":"2025-07-17T09:39:39","modified_gmt":"2025-07-17T08:39:39","slug":"apache-spark","status":"publish","type":"page","link":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/software\/applications\/apache-spark\/","title":{"rendered":"Apache Spark"},"content":{"rendered":"<h2>Overview<\/h2>\n<p><a href=\"http:\/\/spark.apache.org\/docs\/latest\/index.html\">Apache Spark<\/a> is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs<\/p>\n<p>Versions 1.1 (with Hadoop 2.4), 1.6 (with Hadoop 2.6) and 2.3.1 (with Hadoop 2.7) are installed on the CSF (we recommend using the latest available version).<\/p>\n<h2>Restrictions on use<\/h2>\n<p>There are no restrictions on accessing Spark on the CSF. It is distributed under the terms of the <a href=\"http:\/\/www.apache.org\/licenses\/\">Apache License v2.0<\/a>.<\/p>\n<h2>Set up procedure<\/h2>\n<p>We now recommend loading modulefiles within your jobscript so that you have a full record of how the job was run. See the example jobscript below for how to do this. Alternatively, you may load modulefiles on the login node and let the job <abbr title=\"add '#$ -V' to your jobscript\">inherit these settings<\/abbr>.<\/p>\n<p>Load one of the following modulefiles:<\/p>\n<pre>\r\nmodule load apps\/binapps\/apache-spark\/4.0.0    #Works on CSF3_SLURM\r\nmodule load apps\/binapps\/apache-spark\/2.4.3\r\nmodule load apps\/binapps\/apache-spark\/2.3.1\r\nmodule load apps\/binapps\/apache-spark\/1.6.2\r\nmodule load apps\/binapps\/apache-spark\/1.6.0\r\nmodule load apps\/binapps\/apache-spark\/1.1.0\r\n<\/pre>\n<h2>Running the application<\/h2>\n<p>Please do not run spark on the login node. Jobs should be submitted to the compute nodes via batch.<\/p>\n<p>On the CSF Spark should be run in <em>standalone cluster<\/em> mode. This means that you submit a batch job to the CSF batch system in the usual way (with a jobscript) to reserve a number of cores, which will give you access to one or more compute nodes. You then start up a temporary Spark cluster on those compute nodes and run your Spark application within that temporary Spark cluster. When your Spark application finishes the Spark cluster is shut down and the batch job ends. <\/p>\n<p>Hence no Spark processes (e.g., the Spark cluster managers) are left running &#8211; the Spark standalone cluster only exists for the duration of your batch job in the CSF batch system.<\/p>\n<h3>Parallel batch job submission<\/h3>\n<p>Make sure you have the modulefile loaded then create a batch submission script, for example:<\/p>\n<pre class=\"slurm\">\r\n#!\/bin\/bash --login\r\n#SBATCH -p multicore     # Partition 'multicore'\r\n#SBATCH -n 16            # Select no. of CPU core(s)\r\n#SBATCH -t 1-0           # Walltime 1-day & 0-hour\r\n#SBATCH -J spark         # Jobname\r\n#SBATCH -o %x.o%j        # Output file (%x = SLURM_JOB_NAME)\r\n#SBATCH -e %x.e%j        # Error file  (%j = SLURM_JOB_ID)\r\n\r\n\r\n# Change to the required version\r\nmodule load apps\/binapps\/apache-spark\/4.0.0\r\n\r\n# Generate a config file and temporary directories for the spark cluster\r\n# See below for a description of what this does.\r\nspark-csf-config.sh\r\n   #\r\n   # The default will create 1 worker and 1 executor per node.\r\n   # You can add additional variables by adding '-e VAR=value' flags to the above line.\r\n   # For example: if you wish to run 2 workers per slave node, 8 cores per worker and\r\n   # give just under 50% of a 256GB node's memory to each worker use:\r\n   #\r\n   # spark-hydra-config.sh -e SPARK_WORKER_INSTANCES=2 -e SPARK_WORKER_CORES=8 -e SPARK_WORKER_MEMORY=127G\r\n\r\n# Inform Spark where the config file is\r\nexport SPARK_CONF_DIR=`spark-csf-config.sh -d`\r\n\r\n# Set up the environment (note that is a . then a space at the start of the line)\r\n. ${SPARK_CONF_DIR}\/spark-env.sh\r\n\r\n# Start the Spark cluster\r\nstart-all.sh\r\n\r\n# Submit our spark app to the spark cluster.\r\n<!--- # Note we set executor-memory to 32GB (each executor can use 32GB) --->\r\n# EDIT THIS LINE\r\nspark-submit --master $MASTER_URL --verbose --executor-memory 8g myapp.py arg1 arg2\r\n\r\n# Stop the Spark cluster\r\nstop-all.sh\r\n\r\n# OPTIONAL: Cleanup any temporary worker directories in our scratch area\r\nspark-csf-config.sh -cw\r\n<\/pre>\n<pre class=\"sge\">\r\n#!\/bin\/bash --login\r\n#$ -cwd             # Job will run from the current directory\r\n\r\n# Change to the required version\r\nmodule load apps\/binapps\/apache-spark\/1.6.2\r\n\r\n#### CSF - Optionally specify whether you want a 512GB RAM node ####\r\n\r\n#$ -l mem512\r\n\r\n#$ -pe smp.pe 16         # Single compute node only (can be 2-16) if using mem512.\r\n\r\n######### Multi-node: Use only *ONE* of the following Parallel Environments (delete unused line) ########\r\n\r\n#$ -pe smp.pe 24         # Single node where the number of cores can be 2-24\r\n\r\n## or\r\n\r\n#$ -pe mpi-24-ib.pe 48   # Multi-node using entire compute nodes (24 cores per node).\r\n                         # The number of cores MUST be a multiple of 16 and a minimum of 48.\r\n                         # In this example we use two compute nodes (2 x 24 = 48 cores)\r\n\r\n# Generate a config file and temporary directories for the spark cluster\r\n# See below for a description of what this does.\r\nspark-csf-config.sh\r\n   #\r\n   # The default will create 1 worker and 1 executor per node.\r\n   # You can add additional variables by adding '-e VAR=value' flags to the above line.\r\n   # For example: if you wish to run 2 workers per slave node, 8 cores per worker and\r\n   # give just under 50% of a 256GB node's memory to each worker use:\r\n   #\r\n   # spark-hydra-config.sh -e SPARK_WORKER_INSTANCES=2 -e SPARK_WORKER_CORES=8 -e SPARK_WORKER_MEMORY=127G\r\n\r\n# Inform Spark where the config file is\r\nexport SPARK_CONF_DIR=`spark-csf-config.sh -d`\r\n\r\n# Set up the environment (note that is a . then a space at the start of the line)\r\n. ${SPARK_CONF_DIR}\/spark-env.sh\r\n\r\n# Start the Spark cluster\r\nstart-all.sh\r\n\r\n# Submit our spark app to the spark cluster.\r\n# Note we set executor-memory to 32GB (each executor can use 32GB)\r\n# EDIT THIS LINE\r\nspark-submit --master=$MASTER_URL --verbose --executor-memory 32G myapp.py arg1 arg2\r\n\r\n# Stop the Spark cluster\r\nstop-all.sh\r\n\r\n# OPTIONAL: Cleanup any temporary worker directories in our scratch area\r\nspark-csf-config.sh -cw\r\n<\/pre>\n<p>Note that you must include all of the lines unless indicated that they are optional. You will edit the line where you submit your application to the spark cluster. But the other lines that set up the cluster and start\/stop the processes must be present. Your spark job will not run correctly otherwise.<\/p>\n<p>The <code>spark-csf-config.sh<\/code> script at the start of your jobscript does the following:<\/p>\n<ol>\n<li>Creates a config directory (named <code>spark.<em>JOBID<\/em><\/code>) in your job&#8217;s <em>current working directory<\/em> to hold a Spark config file (named <code>spark-env.sh<\/code>) and a subdir (named <code>logs<\/code>) for log files. <em>JOBID<\/em> is the unique number assigned to your batch job when you run <code>qsub<\/code>.<\/li>\n<li>Creates a file named <code>spark-slaves<\/code> in the above directory containing the names of the Hydra compute nodes to be used for worker processes. Spark will <em>automatically<\/em> read this file and start the worker processes.<\/li>\n<li>Creates a temporary directory named <code>spark-temp.<em>JOBID<\/em><\/code> in your scratch area for worker temporary files. This can usually be deleted at the end of a job.<\/li>\n<li>The <code>spark-env.sh<\/code> file written in the config directory sets the following environment variables:\n<ul>\n<li><code>SPARK_CONF_DIR<\/code> to the above <code>spark.<em>JOBID<\/em><\/code> directory.<\/li>\n<li><code>SPARK_MASTER_IP<\/code> to the name of the first hydra compute node assigned to your job (v1.6.x).<\/li>\n<li><code>SPARK_MASTER_HOST<\/code> to the name of the first hydra compute node assigned to your job (v2.3.x).<\/li>\n<li><code>MASTER_URL to<\/code> <code>spark:\/\/${SPARK_MASTER_IP}:7077<\/code><\/li>\n<li><code>SPARK_SLAVES<\/code> to a file containing the hostnames of the slave nodes.<\/li>\n<li><code>SPARK_WORKER_CORES<\/code> to the number of cores per compute node based on the total number of cores you request in the jobscript.<\/li>\n<li><code>SPARK_WORKER_DIR<\/code> to the temporary scratch directory <code>~\/scratch\/spark-temp.<em>JOBID<\/em><\/code><\/li>\n<li><code>SPARK_LOG_DIR<\/code> to a directory inside the temp conf dir (see above)<\/li>\n<\/ul>\n<\/li>\n<\/ol>\n<p>The above environment file is used by the Spark master and slave processes to ensure log files and temporary files are written to appropriate locations in your storage areas. <\/p>\n<p>You should now submit the jobscript to the batch system using:<\/p>\n<pre class=\"slurm\">sbatch <em>scriptname<\/em><\/pre>\n<pre class=\"sge\">qsub <em>scriptname<\/em><\/pre>\n<p>where <em>scriptname<\/em> is the name of your jobscript.<\/p>\n<p>When the job runs, check the <code><em>jobname<\/em>.o<em>JOBID<\/em><\/code> and <code><em>jobname<\/em>.e<em>JOB<\/em><\/code> files for messages from spark. You can ignore any warnings about X11 authority if present. You will have a temporary directory in your current directory named <code>spark.<em>JOBID<\/em><\/code> which contains more log files in a subdirectory named <code>logs<\/code> and the <code>spark-slaves<\/code> file (in case you are interested to see where your spark cluster is running).<\/p>\n<h2>Memory Requirements<\/h2>\n<p>The memory settings in Apache Spark are somewhat complicated. One of our users has offered the following advice:<\/p>\n<ul>\n<li>\nThe default settings are as follows:<\/p>\n<p>The worker gets:<\/p>\n<ul>\n<li>all the available memory minus 1G<\/li>\n<li>the number of cores available (16 in the CSF case)<\/li>\n<\/ul>\n<p>The executor gets:<\/p>\n<ul>\n<li>1GB of memory<\/li>\n<li>all of the cores available on the worker<\/li>\n<\/ul>\n<p>By default, one executor is created per worker, and the settings are taken by node, this is, all of the nodes will have 1 worker and 1 executor if you leave it on its defaults, using:<\/p>\n<pre>\r\nspark-hydra-config.sh\r\n<\/pre>\n<p>Alternatively, the number of workers can be changed as shown in the submission script:<\/p>\n<pre>\r\nspark-hydra-config.sh -e SPARK_WORKER_INSTANCES=2 -e SPARK_WORKER_CORES=8 -e SPARK_WORKER_MEMORY=127G\r\n<\/pre>\n<p>The number of executors can be changed by dividing the resources that the worker has. For example, to have two executors from a worker with ~500GB of memory and 16 cores, the submission script should be:<\/p>\n<pre>\r\n# Generate a spark config file for the compute nodes our job is running on\r\nspark-hydra-config.sh\r\n\r\n# Set up the environment (note that is a . then a space at the start of the line)\r\n. ${SPARK_CONF_DIR}\/spark-env.sh\r\n \r\n# This will make the application to use 64GB and 16 cores\r\nspark-submit --master=$MASTER_URL --verbose --executor-memory 32G --executor-cores 8 myapp.py arg1 arg2\r\n<\/pre>\n<p>The maximum number of executors per node that you could have is 16, each with 32GB and 1 core, considering a maximum of 16 cores. Technically we could even set 1GB per executor and have 512 executors with 1 core each (cores=threads, but the ideal is to keep the number equal to cpu cores) but, that is likely not the best thing to do.<\/p>\n<p>Things to have in mind are that you cannot set the executor memory to have more memory than is available in the worker, if you do that the executor will not start. The executor will also fail if you set the worker to have more memory than the real memory. For example, on the CSF 512GB nodes, if you set the worker memory to be of 800GB, and the executor to have 800GB, it will fail on the executor, the worker will not trigger any failure by itself with regards to the memory.\n<\/li>\n<\/ul>\n<h2>Further info<\/h2>\n<ul>\n<li><a href=\"http:\/\/spark.apache.org\">Apache Spark website<\/a><\/li>\n<\/ul>\n<h2>Updates<\/h2>\n<p>None.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Overview Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs Versions 1.1 (with Hadoop 2.4), 1.6 (with Hadoop 2.6) and 2.3.1 (with Hadoop 2.7) are installed on the CSF (we recommend using the latest available version). Restrictions on use There are no restrictions on accessing Spark on the CSF. It is distributed under the terms.. <a href=\"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/software\/applications\/apache-spark\/\">Read more &raquo;<\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"parent":86,"menu_order":0,"comment_status":"closed","ping_status":"closed","template":"","meta":{"footnotes":""},"class_list":["post-124","page","type-page","status-publish","hentry"],"_links":{"self":[{"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/pages\/124","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/pages"}],"about":[{"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/types\/page"}],"author":[{"embeddable":true,"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/comments?post=124"}],"version-history":[{"count":9,"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/pages\/124\/revisions"}],"predecessor-version":[{"id":10719,"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/pages\/124\/revisions\/10719"}],"up":[{"embeddable":true,"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/pages\/86"}],"wp:attachment":[{"href":"https:\/\/ri.itservices.manchester.ac.uk\/csf3\/wp-json\/wp\/v2\/media?parent=124"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}