Whoops…Nothing found

Try other keywords in your search

How to Use the Dask Client

 1 Minute

 0 Likes

 366 Views

When is this applicable?

In order to use the Dask client, the deployed solution needs to include a Dask scheduler alongside any number of Dask workers. Not all flows will benefit from the Dask parallel computing framework. However, Dask usage can make your Flows scale to work well on large amounts of data. Please note that you may need to talk to your DevOps or IT teams to configure the Dask Scheduler and Workers. (See DevOps Note)

 

How-To

First, be sure to appropriately import dask. Then, set the dask_client parameter to None in the run() function of your Step definition.  

The Dask library offers many different options to decorate functions to run in parallel (e.g. with dask.compute). You can also cast dataframes as listed in Dask’s documentation

The following snippet consists of one possible method to integrate Dask usage alongside the Virtualitics AI Platform.

import dask
@dask.delayed
def function(x):
    return x * 2
    
class DataUpload(Step):
    def run(self, flow_metadata, dask_client=None):
        store_interface = StoreInterface(**flow_metadata)
        pandas_df = store_interface.get_asset(label="stored_asset", type=AssetType.DATASET)
        
        eps = np.split(pandas_df, len(pandas_df.columns), axis=1)
        resultList = []
        try:
            def f(eps):
                results = []
                for ep in eps:
                    y = function(ep)
                    results.append(y)
                return results
            resultList = dask.compute(*f(eps))
        except Exception as e:
            print(e)

 

DevOps Note

Alongside the core backend frontend worker s3 store rabbitmq redis websocket that are part of the default start.sh script, in order to use Dask also the following services are needed (this configuration could vary on Kubernetes deployments). The DASK_SCHEDULER_URL environment variable is used by predict to connect to the Dask cluster. Example to include in your dev.env:

DASK_SCHEDULER_URL=TCP://DASK-SCHEDULER:8786 

When running in a Kubernetes environment this value is passed to the backend through the Predict ConfigMap. This URL can be a local cluster address if the Dask endpoint is running on the Kubernetes cluster. Multiple replica dask workers can be configured from the Dask Helm Chart.

dask-scheduler:
    image: daskdev/dask:2021.4.1
    hostname: dask-scheduler
    ports:
      - "8786:8786"
      - "8787:8787"
    command: ["dask-scheduler"]
    environment:
      - EXTRA_PIP_PACKAGES=s3fs pyarrow numpy==1.19.5 pandas==1.3.1 --upgrade
dask-worker:
    image: daskdev/dask:2021.4.1
    hostname: dask-worker
    command: ["dask-worker", "tcp://dask-scheduler:8786", '--memory-limit=1e9', '--nprocs=1', '--nthreads=1']
    environment:
      - EXTRA_PIP_PACKAGES=s3fs pyarrow numpy==1.19.5 pandas==1.3.1 --upgrade

 

What to Expect (Validation)

Results should be computed using the Dask workers and displayed in the Virtualitics AI Platform, as expected.

Was this article helpful?