diff --git a/README.md b/README.md index 655ccd1b7a6c49a96b712a38fda8e3dbbb1d6064..a66f4e2c5e48d2b49c611291d4258b2951d6f167 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,37 @@ # Spark-Examples ## Interactive Spark Cluster -Script `start_spark_cluster.sh`. +Script `start_spark_cluster.sh`. Spin up a Spark cluster with the specified number of nodes. +To start, simply execute +```bash +sbatch start_spark_cluster.sh +``` +This will return information similar to +``` +Submitted batch job 6525353 +``` +In order to connect, you need to find out the hostname of you compute job. +``` +[kesselheim1@jwlogin23 spark-examples]$ squeue --me + JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) + 6525353 develboos spark-cl kesselhe R 30:40 2 jwb[0129,0149] +``` -Spawns a Spark Master and Spark Workers and waits until Job Timeout. Weird parametrization (e.g. 7 workers threads) to see that it actually works. Unclear: -- In the Scala Example, is uses 7 worker instances as expected. The Python Example uses only 2. Why? -- How to run workers on more than one node? Probably with `srun` -- Executing `srun -n 1 start-worker.sh $MASTER_URL` leads to a quiet failure (?) of the workers. Why? +Then you can run a Spark App with a command similar to +```bash +module load Stages/2023 GCC OpenMPI Spark +export MASTER_URL=spark://jwb0129i.juwels:4124 +python pyspark_pi.py +``` +Note the `i` that that has been added to the master hostname. + + +Open Questions +- In the Scala Example, is uses all worker instances as expected. The Python Example uses only 2. Why? + +ToDos: +- Include a Python Virtual Environment +- Create a Notebook that illustrates how to run the Pi example in Juypter ## References - Pi Estimate (Python + Scala): [](https://spark.apache.org/examples.html) diff --git a/pyspark_pi.py b/pyspark_pi.py index 9668cf55fe5fd59bc8aa9361189fce10eb9558e4..d5eb1b575b2915eb4b66c197f56efd954cd2f884 100644 --- a/pyspark_pi.py +++ b/pyspark_pi.py @@ -1,8 +1,22 @@ -import pyspark -import random +from pyspark.sql import SparkSession import os +import random +home = os.environ["HOME"] +spark_master=os.environ["MASTER_URL"] + +# This is required to add a "i" to the hostname +tmp=os.environ["HOSTNAME"].split("."); tmp[0]+="i"; spark_driver_hostname=".".join(tmp) + +spark = SparkSession \ + .builder \ + .appName("My SparkSession") \ + .config("spark.master", spark_master) \ + .config("spark.driver.memory", "10g") \ + .config("spark.driver.host", spark_driver_hostname) \ + .config("spark.executor.memory", "400g") \ + .getOrCreate() -sc = pyspark.SparkContext(os.environ["MASTER_URL"]) +sc=spark.sparkContext def inside(p): x, y = random.random(), random.random() diff --git a/start_spark_cluster.sh b/start_spark_cluster.sh index db4b4a98cdee1a8b61833ceba192995d257296d6..3682789af94c541acca59c26c53f7118a9d759f8 100644 --- a/start_spark_cluster.sh +++ b/start_spark_cluster.sh @@ -2,8 +2,8 @@ #SBATCH --partition=develbooster #SBATCH --account=atmlaml -#SBATCH --nodes=1 -#SBATCH --tasks-per-node=9 +#SBATCH --nodes=2 +#SBATCH --tasks-per-node=1 #SBATCH --time=00:60:00 #SBATCH --gres gpu #SBATCH --job-name spark-cluster @@ -13,11 +13,22 @@ module load Stages/2023 GCC OpenMPI Spark JOB="$SLURM_JOB_NAME-$SLURM_JOB_ID" export SPARK_WORKER_DIR="$SLURM_SUBMIT_DIR/$JOB/worker" export SPARK_LOG_DIR="$SLURM_SUBMIT_DIR/$JOB/log" -export SPARK_MASTER_HOST=`hostname` + +# We need Hostnames with an appended "i" at the end of the first part of the +# Hostname. This is done here. If not,export SPARK_MASTER_HOST=`hostname` would be enough +export SPARK_MASTER_HOST=`hostname | sed -E "s/(.*)\.(.*)/\1i.\2/g"` export SPARK_MASTER_PORT="4124" -export SPARK_WORKER_CORES=7 -export SPARK_WORKER_INSTANCES=5 ## This is not working yet + +#Change this to alter the number of workers and the number their max number of threads +export SPARK_WORKER_CORES=92 +export SPARK_WORKER_INSTANCES=1 + +#These options seem not to have any effect. Not clear why. +#export SPARK_WORKER_MEMORY="10G" +#export SPARK_EXECUTOR_MEMORY="10G" + export MASTER_URL="spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}" +mkdir -p ${SPARK_LOG_DIR} export > ${SPARK_LOG_DIR}/env.txt echo "------------ Starting Spark Server -------------" @@ -25,16 +36,6 @@ echo "MASTER_URL: $MASTER_URL" echo "------------------------------------------------" start-master.sh -echo "------------ Starting Spark Workers ------------" -echo "MASTER_URL: $MASTER_URL" -echo "------------------------------------------------" - -#srun -n 14 spark-class org.apache.spark.deploy.worker.Worker $MASTER_URL -#srun -n 1 start-worker.sh $MASTER_URL -start-worker.sh $MASTER_URL - -. wait-worker.sh - echo "----------- How to test with spark shell ------" echo "sgoto $SLURM_JOB_ID 0" echo "module load Stages/2023 GCC OpenMPI Spark" @@ -65,8 +66,23 @@ echo echo "---------- How to kill -------------------------" echo scancel $SLURM_JOB_ID echo "------------------------------------------------" +echo +echo +echo +echo "------------ Starting Spark Workers ------------" +echo "MASTER_URL: $MASTER_URL" +echo "------------------------------------------------" + +export SPARK_NO_DAEMONIZE=1 +srun bash -c 'export SPARK_PUBLIC_DNS=`hostname | sed -E "s/(.*)\.(.*)/\1i.\2/g"` ; + start-worker.sh $MASTER_URL;' +# export SPARK_LOCAL_IP=`getent ahosts $SPARK_PUBLIC_DNS | head -n1 | cut -d " " -f 1`; -sleep infinity +# This script can be used for workers that start in Daemon +# Mode. +#. wait-worker.sh +# Then we need to sleep. +# sleep infinity