When is this applicable?
If you would like to use PySpark functionality in your Flow, you can do so by following the steps in this article.
How-To
At the top of your flow, you will want to add the following import statements, as well as a named DATA_LINK
variable to handle persistence:
import pyspark.sql.functions as f
from predict_backend.flow.flow import Flow, Link
from predict_backend.persistence.spark import SparkHandler, SparkPersistence
DATA_LINK = 'transformation_step_output'
This will import your PySpark functionality as well as the persistence handlers the Virtualitics AI Platform uses to interact with PySpark.
For this example, we will use two Steps. One is a Transform
step where the computation is performed, the other is a DataShow
step where we can view and interact with the data.
You will also need to modify the run()
functions for the steps that will have PySpark computation offloaded on to them by adding a spark_session=None
keyword argument. The Virtualitics AI Platform will automatically handle the spark_session
argument passing for you.
class Transform(Step):
def run(self, flow_metadata: dict, spark_session=None):
...
class DataShow(Step):
def run(self, flow_metadata: dict, spark_session=None):
...
When you instantiate the classes at the bottom of your Flow, you will need to pass in a uses_pyspark=True
argument, like so:
transform_section = Section("Example Transform Step", [])
transform_page = Page("", PageType.DATA_LAB, {}, [transform_section])
transform_step = Transform("Aggregate and Transform",
"Aggregate and Transform",
"",
StepType.DATA_LAB,
transform_page,
uses_pyspark=True)
data_show_section = Section("Example Data Show Step", [])
data_show_page = Page("", PageType.DATA_LAB, {}, [data_show_section])
data_show_step = DataShow("Results",
"Results",
"",
StepType.DATA_LAB,
data_show_page,
uses_pyspark=True)
You can now use PySpark functions in your step. For example, if you have a pandas DataFrame, the following code snippet will convert it to a Spark DF and persist it in the store_interface
. You will also need to define the out_link
Link for persistence.
class Transform(Step):
def run(self, flow_metadata: dict, spark_session=None):
out_link: Link = store_interface.get_outlink()
# Assuming you have a pandas df named "pandas_df"
df = spark_session.createDataFrame(pandas_df)
# Perform Pyspark functionality here
#...
# Persist to store_interface
sp = SparkPersistence(handler=SparkHandler(spark_session=spark_session),
name=DATA_LINK,
data=df)
out_link.add_data(sp)
store_interface.save_link(out_link)
spark_session.stop()
In order to access the persisted PySpark DataFrame in a different step, you will use an in_link
Link. Make sure to stop the spark_session
explicitly at the end of each Step.
class DataShow(Step):
def run(self, flow_metadata: dict, spark_session=None):
in_link: Link = store_interface.get_inlink()
# re-load the data persisted in the previous step and convert it into a Pandas DF to show in a table
df = in_link.data[DATA_LINK].get(spark_session=spark_session).toPandas()
spark_session.stop()
What to Expect (Validation)
Computation will now be off-loaded onto the PySpark client, and persisted across the flow. You should be able to monitor the status of PySpark jobs in your Spark master node or in your Databricks interface. Additionally, please keep in mind that certain logs may go through Spark logging system.
Additional Details
You can also add integration with S3. Once you have the data in S3, you can add the following line to your Steps to get the data in S3 as a PySpark DataFrame:
df = spark_session.read.json(s3_path)