When is this applicable?
In order to use the Dask client, the deployed solution needs to include a Dask scheduler alongside any number of Dask workers. Not all flows will benefit from the Dask parallel computing framework. However, Dask usage can make your Flows scale to work well on large amounts of data. Please note that you may need to talk to your DevOps or IT teams to configure the Dask Scheduler and Workers. (See DevOps Note)
How-To
First, be sure to appropriately import dask
. Then, set the dask_client
parameter to None
in the run()
function of your Step
definition.
The Dask library offers many different options to decorate functions to run in parallel (e.g. with dask.compute). You can also cast dataframes as listed in Dask’s documentation.
The following snippet consists of one possible method to integrate Dask usage alongside the Virtualitics AI Platform.
import dask
@dask.delayed
def function(x):
return x * 2
class DataUpload(Step):
def run(self, flow_metadata, dask_client=None):
store_interface = StoreInterface(**flow_metadata)
pandas_df = store_interface.get_asset(label="stored_asset", type=AssetType.DATASET)
eps = np.split(pandas_df, len(pandas_df.columns), axis=1)
resultList = []
try:
def f(eps):
results = []
for ep in eps:
y = function(ep)
results.append(y)
return results
resultList = dask.compute(*f(eps))
except Exception as e:
print(e)
DevOps Note
Alongside the core backend frontend worker s3 store rabbitmq redis websocket
that are part of the default start.sh script, in order to use Dask also the following services are needed (this configuration could vary on Kubernetes deployments). The DASK_SCHEDULER_URL environment variable is used by predict to connect to the Dask cluster. Example to include in your dev.env:
DASK_SCHEDULER_URL=TCP://DASK-SCHEDULER:8786
When running in a Kubernetes environment this value is passed to the backend through the Predict ConfigMap. This URL can be a local cluster address if the Dask endpoint is running on the Kubernetes cluster. Multiple replica dask workers can be configured from the Dask Helm Chart.
dask-scheduler:
image: daskdev/dask:2021.4.1
hostname: dask-scheduler
ports:
- "8786:8786"
- "8787:8787"
command: ["dask-scheduler"]
environment:
- EXTRA_PIP_PACKAGES=s3fs pyarrow numpy==1.19.5 pandas==1.3.1 --upgrade
dask-worker:
image: daskdev/dask:2021.4.1
hostname: dask-worker
command: ["dask-worker", "tcp://dask-scheduler:8786", '--memory-limit=1e9', '--nprocs=1', '--nthreads=1']
environment:
- EXTRA_PIP_PACKAGES=s3fs pyarrow numpy==1.19.5 pandas==1.3.1 --upgrade
What to Expect (Validation)
Results should be computed using the Dask workers and displayed in the Virtualitics AI Platform, as expected.