If you would like to use PySpark functionality in your Flow, you can do so by following the steps in this article.
Setting Up PySpark
At the top of your Flow, add the following import statements, as well as a named DATA_LINK
variable to handle persistence.
This will import your PySpark functionality as well as the persistence handlers that Virtualitics Predict uses to interact with PySpark.
import pyspark.sql.functions as f
from predict_backend.flow.flow import Flow, Link
from predict_backend.persistence.spark import SparkHandler, SparkPersistence
DATA_LINK = 'transformation_step_output'
Transform and DataShow
For this example, we will use two Steps. One is a Transform
step where the computation is performed, the other is a DataShow
step where we can view and interact with the data.
You will also need to modify the run()
functions for the steps that will have PySpark computation offloaded on to them by adding a spark_session=None
keyword argument. Virtualitics Predict will automatically handle the spark_session
argument passing for you.
class Transform(Step):
def run(self, flow_metadata: dict, spark_session=None):
...
class DataShow(Step):
def run(self, flow_metadata: dict, spark_session=None):
...
When you instantiate the classes at the bottom of your Flow, you will need to pass in a uses_pyspark=True
argument, as shown below:
transform_section = Section("Example Transform Step", [])
transform_page = Page("", PageType.DATA_LAB, {}, [transform_section])
transform_step = Transform("Aggregate and Transform",
"Aggregate and Transform",
"",
StepType.DATA_LAB,
transform_page,
uses_pyspark=True)
data_show_section = Section("Example Data Show Step", [])
data_show_page = Page("", PageType.DATA_LAB, {}, [data_show_section])
data_show_step = DataShow("Results",
"Results",
"",
StepType.DATA_LAB,
data_show_page,
uses_pyspark=True)
Use PySpark Functions
You can now use PySpark functions in your Step.
For example, if you have a pandas DataFrame, the following code snippet will convert it to a Spark DF and persist it in the store_interface
. You will also need to define the out_link
Link for persistence.
class Transform(Step):
def run(self, flow_metadata: dict, spark_session=None):
out_link: Link = store_interface.get_outlink()
# Assuming you have a pandas df named "pandas_df"
df = spark_session.createDataFrame(pandas_df)
# Perform Pyspark functionality here
#...
# Persist to store_interface
sp = SparkPersistence(handler=SparkHandler(spark_session=spark_session),
name=DATA_LINK,
data=df)
out_link.add_data(sp)
store_interface.save_link(out_link)
spark_session.stop()
Access Persisted PySpark DataFrame
In order to access the persisted PySpark DataFrame in a different step, you will use an in_link
Link. Make sure to stop the spark_session
explicitly at the end of each Step.
class DataShow(Step):
def run(self, flow_metadata: dict, spark_session=None):
in_link: Link = store_interface.get_inlink()
# re-load the data persisted in the previous step and convert it into a Pandas DF to show in a table
df = in_link.data[DATA_LINK].get(spark_session=spark_session).toPandas()
spark_session.stop()
What to Expect
Computation will now be off-loaded onto the PySpark client, and persisted across the flow. You should be able to monitor the status of PySpark jobs in your Spark master node or in your Databricks interface.
Additionally, please keep in mind that certain logs may go through Spark logging system.
Additional Details
You can also add integration with S3. Once you have the data in S3, you can add the following line to your Steps to get the data in S3 as a PySpark DataFrame:
df = spark_session.read.json(s3_path)
Previous Article |
Next Article |