When is this applicable?
In order to use the Dask client, the deployed solution needs to include a Dask scheduler alongside any number of Dask workers. Not all flows will benefit from the Dask parallel computing framework. However, Dask usage can make your Flows scale to work well on large amounts of data. Please note that you may need to talk to your DevOps or IT teams to configure the Dask Scheduler and Workers. (See DevOps Note)
First, be sure to appropriately import
dask. Then, set the
dask_client parameter to
None in the
run() function of your
The Dask library offers many different options to decorate functions to run in parallel (e.g. with dask.compute). You can also cast dataframes as listed in Dask’s documentation.
The following snippet consists of one possible method to integrate Dask usage alongside the Virtualitics AI Platform.
import dask @dask.delayed def function(x): return x * 2 class DataUpload(Step): def run(self, flow_metadata, dask_client=None): store_interface = StoreInterface(**flow_metadata) pandas_df = store_interface.get_asset(label="stored_asset", type=AssetType.DATASET) eps = np.split(pandas_df, len(pandas_df.columns), axis=1) resultList =  try: def f(eps): results =  for ep in eps: y = function(ep) results.append(y) return results resultList = dask.compute(*f(eps)) except Exception as e: print(e)
Alongside the core
backend frontend worker s3 store rabbitmq redis websocket that are part of the default start.sh script, in order to use Dask also the following services are needed (this configuration could vary on Kubernetes deployments). The DASK_SCHEDULER_URL environment variable is used by predict to connect to the Dask cluster. Example to include in your dev.env:
When running in a Kubernetes environment this value is passed to the backend through the Predict ConfigMap. This URL can be a local cluster address if the Dask endpoint is running on the Kubernetes cluster. Multiple replica dask workers can be configured from the Dask Helm Chart.
dask-scheduler: image: daskdev/dask:2021.4.1 hostname: dask-scheduler ports: - "8786:8786" - "8787:8787" command: ["dask-scheduler"] environment: - EXTRA_PIP_PACKAGES=s3fs pyarrow numpy==1.19.5 pandas==1.3.1 --upgrade dask-worker: image: daskdev/dask:2021.4.1 hostname: dask-worker command: ["dask-worker", "tcp://dask-scheduler:8786", '--memory-limit=1e9', '--nprocs=1', '--nthreads=1'] environment: - EXTRA_PIP_PACKAGES=s3fs pyarrow numpy==1.19.5 pandas==1.3.1 --upgrade
What to Expect (Validation)
Results should be computed using the Dask workers and displayed in the Virtualitics AI Platform, as expected.