Whoops…Nothing found

Try other keywords in your search

How to Use the PySpark Client

 1 Minute

 0 Likes

 186 Views

When is this applicable?

If you would like to use PySpark functionality in your Flow, you can do so by following the steps in this article.  

 

How-To

At the top of your flow, you will want to add the following import statements, as well as a named DATA_LINK variable to handle persistence: 

import pyspark.sql.functions as f
from predict_backend.flow.flow import Flow, Link
from predict_backend.persistence.spark import SparkHandler, SparkPersistence

DATA_LINK = 'transformation_step_output'

This will import your PySpark functionality as well as the persistence handlers the Virtualitics AI Platform uses to interact with PySpark. 

For this example, we will use two Steps. One is a Transform step where the computation is performed, the other is a DataShow step where we can view and interact with the data. 

You will also need to modify the run() functions for the steps that will have PySpark computation offloaded on to them by adding a spark_session=None keyword argument. The Virtualitics AI Platform will automatically handle the spark_session argument passing for you. 

class Transform(Step):
    def run(self, flow_metadata: dict, spark_session=None):
      ...
      
class DataShow(Step):
    def run(self, flow_metadata: dict, spark_session=None):
      ... 

When you instantiate the classes at the bottom of your Flow, you will need to pass in a uses_pyspark=True argument, like so: 

transform_section = Section("Example Transform Step", [])
transform_page = Page("", PageType.DATA_LAB, {}, [transform_section])
transform_step = Transform("Aggregate and Transform",
                           "Aggregate and Transform",
                           "",
                           StepType.DATA_LAB,
                           transform_page,
                           uses_pyspark=True)
                           
data_show_section = Section("Example Data Show Step", [])
data_show_page = Page("", PageType.DATA_LAB, {}, [data_show_section])
data_show_step = DataShow("Results",
                          "Results",
                          "",
                          StepType.DATA_LAB,
                          data_show_page,
                          uses_pyspark=True)

You can now use PySpark functions in your step. For example, if you have a pandas DataFrame, the following code snippet will convert it to a Spark DF and persist it in the store_interface. You will also need to define the out_link Link for persistence. 

class Transform(Step):
  def run(self, flow_metadata: dict, spark_session=None):
    
    out_link: Link = store_interface.get_outlink()
    
    # Assuming you have a pandas df named "pandas_df"
    df = spark_session.createDataFrame(pandas_df) 
    
    # Perform Pyspark functionality here
    #...
    
    # Persist to store_interface
    sp = SparkPersistence(handler=SparkHandler(spark_session=spark_session),
                          name=DATA_LINK,
                          data=df)
    out_link.add_data(sp)
    store_interface.save_link(out_link)
    spark_session.stop()

In order to access the persisted PySpark DataFrame in a different step, you will use an in_link Link. Make sure to stop the spark_session explicitly at the end of each Step.  

class DataShow(Step):
  def run(self, flow_metadata: dict, spark_session=None):
    in_link: Link = store_interface.get_inlink()
    
    # re-load the data persisted in the previous step and convert it into a Pandas DF to show in a table
    df = in_link.data[DATA_LINK].get(spark_session=spark_session).toPandas()
    spark_session.stop()    

 

What to Expect (Validation)

Computation will now be off-loaded onto the PySpark client, and persisted across the flow. You should be able to monitor the status of PySpark jobs in your Spark master node or in your Databricks interface. Additionally, please keep in mind that certain logs may go through Spark logging system.

 

Additional Details

You can also add integration with S3. Once you have the data in S3, you can add the following line to your Steps to get the data in S3 as a PySpark DataFrame: 

df = spark_session.read.json(s3_path)

Was this article helpful?