Context
The WEkEO DASK Gateway provides a managed environment for parallel computing directly within the WEkEO JupyterHub workspace.
DASK is a flexible parallel computing library for Python that enables you to scale your data analysis workflows from your laptop to large clusters. It provides parallel and distributed computing capabilities while using familiar Python APIs like NumPy, Pandas, and xarray.
In this article, we will explain what DASK is, when to use it, and how to create and manage DASK clusters on the WEkEO infrastructure for processing large Copernicus datasets.
For hands-on examples, see the WEkEO DASK GitHub repository, which provides a collection of notebooks for learning how to use the WEkEO Workspace DASK Gateway and scale data processing workflows.
What is DASK?
DASK breaks down large computations into smaller tasks that can be executed in parallel across multiple cores or machines. It provides:
Parallel computing: Execute operations simultaneously across multiple workers
Lazy evaluation: Operations are recorded but not executed until you explicitly request results
Familiar APIs: Drop-in replacements for NumPy arrays (
dask.array), Pandas DataFrames (dask.dataframe), and moreMemory efficiency: Work with datasets larger than your available RAM by processing data in chunks
Seamless scaling: The same code runs on your local machine or distributed cluster
When you use the WEkEO DASK Gateway, you work with:
Scheduler: Coordinates the work across workers
Workers: Execute the actual computations in parallel
Client: Your Jupyter notebook connects to the cluster through a client object
For more detailed information, visit the official DASK documentation.
When to use DASK?
When DASK is useful:
✅ Large datasets: When your data doesn't fit into memory (larger than your available RAM)
✅ Parallel processing: When you want to speed up computations by utilizing multiple CPU cores
✅ Array operations: Working with large multi-dimensional arrays (e.g., climate data, satellite imagery)
✅ Time-series analysis: Processing temporal data from environmental monitoring
When DASK might not be necessary:
❌ Small datasets: Data that easily fits in memory and processes quickly with standard libraries
❌ Simple operations: Basic data analysis that runs quickly without optimization
❌ I/O bound tasks: Operations primarily limited by disk or network speed
❌ Complex dependencies: Tasks with intricate dependencies between operations may not benefit from parallelization
Common use cases with Copernicus Data on WEkEO:
Processing large satellite image archives
Climate data analysis with multi-dimensional arrays (using xarray + DASK)
Time-series analysis of environmental data
Large-scale geospatial computations
Machine learning on large datasets
💡WEkEO Pro Tip: the WEkEO Workspace DASK Gateway is a good starting point to make your code DASK-ready before moving to larger cloud systems to scale up data processing.
Connecting to the WEkEO DASK Gateway
Access the WEkEO JupyterHub workspace
To use the WEkEO Dask Gateway, you first need access to the WEkEO JupyterHub workspace, which provides an interactive Python environment (Jupyter notebooks) directly in the cloud.
⚠️ You must have a WEkEO account to log in and access the workspace
Download the notebook
This article is also available as a ready-to-use Jupyter notebook, so you can follow the steps directly in your workspace.
Import required libraries
In the WEkEO JupyterHub workspace, create a notebook in your personal folder, then import the necessary libraries for working with the DASK Gateway:
# Import DASK Gateway libraries
from dask_gateway import Gateway, GatewayCluster
# Import DASK and related libraries
import dask.array as da
import xarray as xr
import numpy as np
# For displaying cluster information
from IPython.display import display
Establish a connection
Create a connection to the WEkEO DASK Gateway:
# Create a gateway connection
gateway = Gateway()
print("Successfully connected to the WEkEO DASK Gateway!")
print(f"Gateway address: {gateway.address}")
Configuring and creating a cluster
Configure cluster options
Before creating a cluster, configure the resources for your workers:
# Get the available cluster options
options = gateway.cluster_options()
# Display the options interface (in Jupyter, this shows a widget)
display(options)
# Set worker resources
options.worker_cores = 1 # 1 CPU core per worker
options.worker_memory = 1.0 # 1 GB RAM per worker
print(f"Cluster configured with:")
print(f" - Worker cores: {options.worker_cores}")
print(f" - Worker memory: {options.worker_memory} GB")
Current limitations per Cluster:
Number of cores: 2
Memory: 2.00 GiB
Number of workers: 3
Create and scale your cluster
Now create a cluster with your specified configuration:
# Create a new cluster
cluster = GatewayCluster(cluster_options=options)
# Scale the cluster to 2 workers
cluster.scale(2)
print(f"Cluster created!")
print(f"Cluster name: {cluster.name}")
# Display the cluster (shows a dashboard link and status)
display(cluster)
📌Note: The cluster dashboard provides a real-time view of your DASK cluster's performance. You can monitor task progress, memory usage, worker status, and execution timelines.
Connect a client to the cluster
To use the cluster, create a client that connects your notebook to it:
# Get a client connected to the cluster
client = cluster.get_client()
print("Client connected to cluster!")
print(f"Dashboard link: {client.dashboard_link}")
# Display client information
display(client)
💡WEkEO Pro Tip: click the dashboard link to open it in a new tab and monitor your cluster's performance in real-time!
Your first parallel computation
Create a DASK array
Create a large 3D array representing a dataset similar to Earth observation time-series:
# Define dimensions (time, latitude, longitude)
time_steps = 365 # One year of daily data
lat_points = 500 # 500 latitude points
lon_points = 500 # 500 longitude points
# Create a large random array with DASK
# The 'chunks' parameter defines how the array is divided for parallel processing
large_array = da.random.random(
(time_steps, lat_points, lon_points),
chunks=(50, 100, 100) # Process in smaller blocks
)
print(f"Array shape: {large_array.shape}")
print(f"Array size in memory: {large_array.nbytes / 1e9:.2f} GB")
print(f"Chunk size: {large_array.chunksize}")
print(f"Number of chunks: {large_array.npartitions}")
📌Note: Creating the array is instant because DASK uses lazy evaluation - the array hasn't actually been created yet, just the recipe for creating it.
Perform a computation
Calculate the mean across the time dimension, distributed across workers:
# Calculate the mean over the time dimension (lazy operation) mean_over_time = large_array.mean(axis=0)
print("Mean calculation defined (lazy operation)")
print(f"Result shape will be: {mean_over_time.shape}")
# Compute the result (triggers parallel execution)
result = mean_over_time.compute()
print(f"Computation complete!")
print(f"Result shape: {result.shape}")
print(f"Result sample value: {result[0, 0]:.6f}")
📌Note: watch the DASK dashboard while the computation runs to see tasks being distributed, memory usage, and progress bars!
Working with xarray and DASK
DASK integrates seamlessly with xarray for Earth observation data:
# Create coordinate arrays
time = np.arange(0, 365)
lat = np.linspace(-90, 90, 500)
lon = np.linspace(-180, 180, 500)
# Create a DASK-backed xarray DataArray
data_dask = da.random.random((365, 500, 500), chunks=(50, 100, 100))
ds = xr.DataArray(
data_dask,
dims=("time", "lat", "lon"),
coords={"time": time, "lat": lat, "lon": lon},
name="temperature"
)
print("Created xarray DataArray with DASK backend:")
display(ds)
# Perform a realistic analysis: select a region and compute statistics
# Select a region (e.g., Europe)
europe = ds.sel(lat=slice(35, 70), lon=slice(-10, 40))
# Calculate temporal mean and standard deviation
temporal_mean = europe.mean(dim="time").compute()
temporal_std = europe.std(dim="time").compute()
print("Regional statistics computed!")
print(f"Mean value range: {float(temporal_mean.min()):.4f} to {float(temporal_mean.max()):.4f}")
print(f"Std dev range: {float(temporal_std.min()):.4f} to {float(temporal_std.max()):.4f}")
Managing your DASK cluster
Scaling your cluster
Dynamically adjust the number of workers based on your computational needs:
# Scale up to 3 workers
print("Scaling cluster to 3 workers...")
cluster.scale(3)
# Wait a moment for workers to start
import time
time.sleep(5)
# Check current worker count
print(f"Current workers: {len(client.scheduler_info()['workers'])}")
Monitoring cluster resources
Check the status of your cluster:
# Get information about the scheduler and workers
scheduler_info = client.scheduler_info()
print(f"Cluster status:")
print(f" Total workers: {len(scheduler_info['workers'])}")
print(f" Total cores: {sum(w['nthreads'] for w in scheduler_info['workers'].values())}")
print(f" Total memory: {sum(w['memory_limit'] for w in scheduler_info['workers'].values()) / 1e9:.2f} GB")
print(f"\nWorker details:")
for worker_id, worker_info in list(scheduler_info['workers'].items())[:2]: # Show first 2
print(f" {worker_id}:")
print(f" Cores: {worker_info['nthreads']}")
print(f" Memory: {worker_info['memory_limit'] / 1e9:.2f} GB")
Closing your cluster
When finished, always close your cluster to free up resources:
# Close the client connection
client.close()
print("Client closed")
# Shutdown the cluster
cluster.close()
print("Cluster shutdown complete")
⚠️ Leaving clusters running unnecessarily consumes resources and may impact other users. Always clean up when done!
Best practices
Start small: Begin with a small cluster and scale up if needed
Monitor performance: Use the dashboard to identify bottlenecks
Choose appropriate chunks: Aim for chunk sizes of 100-500 MB for optimal performance
Use lazy evaluation: Build your computation pipeline before calling
.compute()Close resources: Always shut down your cluster when finished
What's next?
Additional resources can be found in our Help Center. Should you require further assistance or wish to provide feedback, feel free to contact us through a chat session available in the bottom right corner of the page.



