Battery of code examples to demonstrate the capabilities of Pravega as a data stream storage system for Apache Spark.
Install Ubuntu 18.04 LTS. Other operating systems can also be used but the commands below have only been tested on this version.
sudo apt-get install openjdk-11-jdk
You may have multiple versions of Java installed. Ensure that Java 11 is the default with the command below.
sudo update-alternatives --config java
This will run a development instance of Pravega locally.
cd
git clone https://github.com/pravega/pravega
cd pravega
git checkout r0.9
./gradlew startStandalone
This will install a development instance of Spark locally.
Download https://www.apache.org/dyn/closer.lua/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz.
mkdir -p ~/spark
cd ~/spark
tar -xzvf ~/Downloads/spark-3.0.2-bin-hadoop2.7.tgz
ln -s spark-3.0.2-bin-hadoop2.7 current
export PATH="$HOME/spark/current/bin:$PATH"
By default, the script run_spark_ap.sh
will use an in-process Spark mini-cluster
that is started with the Spark job (--master local[2]
).
Run a PySpark batch job that reads events from the file sample_data.json and writes to a Pravega stream
./run_pyspark_app.sh src/main/python/batch_file_to_pravega.py
The file sample_data.json has the following conents:
{"id": 1, "key": "a", "message": "Hello world."}
{"id": 2, "key": "b", "message": "Welcome to Pravega!"}
{"id": 3, "key": "a", "message": "Another message."}
{"id": 4, "key": "c", "message": "Another message."}
{"id": 5, "key": "c", "message": "Another message."}
You should see output similar to the following:
+--------------------------------------------------+-----------+
|event |routing_key|
+--------------------------------------------------+-----------+
|{"id":1,"key":"a","message":"Hello world."} |a |
|{"id":2,"key":"b","message":"Welcome to Pravega!"}|b |
|{"id":3,"key":"a","message":"Another message."} |a |
|{"id":4,"key":"c","message":"Another message."} |c |
|{"id":5,"key":"c","message":"Another message."} |c |
+--------------------------------------------------+-----------+
./run_pyspark_app.sh src/main/python/batch_pravega_to_console.py
You should see output similar to the following:
+--------------------------------------------------+--------+------------+----------+------+
|event |scope |stream |segment_id|offset|
+--------------------------------------------------+--------+------------+----------+------+
|{"id":4,"key":"c","message":"Another message."} |examples|batchstream1|1 |0 |
|{"id":5,"key":"c","message":"Another message."} |examples|batchstream1|1 |55 |
|{"id":1,"key":"a","message":"Hello world."} |examples|batchstream1|4 |0 |
|{"id":2,"key":"b","message":"Welcome to Pravega!"}|examples|batchstream1|4 |51 |
|{"id":3,"key":"a","message":"Another message."} |examples|batchstream1|4 |109 |
+--------------------------------------------------+--------+------------+----------+------+
./run_pyspark_app.sh src/main/python/stream_generated_data_to_pravega.py
This job will continue to run and write events until stopped.
rm -rf /tmp/spark-checkpoints-stream_pravega_to_console
./run_pyspark_app.sh src/main/python/stream_pravega_to_console.py
This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.
You should see output similar to the following every 3 seconds:
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------------+--------+-----------------+----------+------+
|event |scope |stream |segment_id|offset|
+-----------------------+--------+-----------------+----------+------+
|2019-05-23 14:40:08.329|examples|streamprocessing1|0 |181753|
|2019-05-23 14:40:10.329|examples|streamprocessing1|0 |181784|
|2019-05-23 14:40:09.329|examples|streamprocessing1|0 |181815|
+-----------------------+--------+-----------------+----------+------+
rm -rf /tmp/spark-checkpoints-stream_pravega_to_pravega
./run_spark_app.sh src/main/python/stream_pravega_to_pravega.py
This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.
rm -rf /tmp/spark-checkpoints-StreamPravegaToConsole
./run_java_spark_app.sh io.pravega.example.spark.StreamPravegaToConsole
This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.
You should see output similar to the following every 3 seconds:
+-----------------------+--------+-----------------+----------+------+
|event |scope |stream |segment_id|offset|
+-----------------------+--------+-----------------+----------+------+
|2019-05-23 14:45:59.329|examples|streamprocessing1|0 |192634|
|2019-05-23 14:46:01.329|examples|streamprocessing1|0 |192665|
|2019-05-23 14:46:00.329|examples|streamprocessing1|0 |192696|
+-----------------------+--------+-----------------+----------+------+
If you press Control-C to abort the job, and then restart it, you will see that it will start exactly where it left off.
This will be apparent because the batch number will not start at 0.
This recovery information stored in the checkpoint directory.
If you delete the checkpoint directory, or specify an empty directory, it will start from the earliest event
since the option start_stream_cut
is set to earliest
.
If you make significant changes to your Spark application, Spark may encounter an error when it tries to load the checkpoint.
In this case, you will need to delete the checkpoint directory.
Start a separate Spark server process.
~/spark/current/sbin/start-all.sh
Confirm that you can browse to the Spark Master UI at http://localhost:8080/.
Submit the job.
USE_IN_PROCESS_SPARK=0 ./run_pyspark_app.sh src/main/python/stream_generated_data_to_pravega.py
USE_IN_PROCESS_SPARK=0 ./run_java_spark_app.sh io.pravega.example.spark.StreamPravegaToConsole
These steps show how to use Anaconda Python to run PySpark applications. Anaconda Python is configured with Numpy, Pandas, and TensorFlow.
See https://www.anaconda.com/rpm-and-debian-repositories-for-miniconda/.
source /opt/conda/etc/profile.d/conda.sh
src/main/python/create_conda_env.sh
conda activate pravega-samples
Run the following before executing spark-submit
.
export PYSPARK_PYTHON=$HOME/.conda/envs/pravega-samples/bin/python