Skip to main content

Introduction to the WEkEO DASK Gateway

Scale your data processing with parallel computing on WEkEO! Let's discover how to use DASK for efficient analysis of large Earth observation datasets.

Written by Alexandre
Updated today

Context


The WEkEO DASK Gateway provides a managed environment for parallel computing directly within the WEkEO JupyterHub workspace.

DASK is a flexible parallel computing library for Python that enables you to scale your data analysis workflows from your laptop to large clusters. It provides parallel and distributed computing capabilities while using familiar Python APIs like NumPy, Pandas, and xarray.

In this article, we will explain what DASK is, when to use it, and how to create and manage DASK clusters on the WEkEO infrastructure for processing large Copernicus datasets.

For hands-on examples, see the WEkEO DASK GitHub repository, which provides a collection of notebooks for learning how to use the WEkEO Workspace DASK Gateway and scale data processing workflows.

What is DASK?


DASK breaks down large computations into smaller tasks that can be executed in parallel across multiple cores or machines. It provides:

  • Parallel computing: Execute operations simultaneously across multiple workers

  • Lazy evaluation: Operations are recorded but not executed until you explicitly request results

  • Familiar APIs: Drop-in replacements for NumPy arrays (dask.array), Pandas DataFrames (dask.dataframe), and more

  • Memory efficiency: Work with datasets larger than your available RAM by processing data in chunks

  • Seamless scaling: The same code runs on your local machine or distributed cluster

When you use the WEkEO DASK Gateway, you work with:

  1. Scheduler: Coordinates the work across workers

  2. Workers: Execute the actual computations in parallel

  3. Client: Your Jupyter notebook connects to the cluster through a client object

For more detailed information, visit the official DASK documentation.

When to use DASK?


When DASK is useful:

Large datasets: When your data doesn't fit into memory (larger than your available RAM)

Parallel processing: When you want to speed up computations by utilizing multiple CPU cores

Array operations: Working with large multi-dimensional arrays (e.g., climate data, satellite imagery)

Time-series analysis: Processing temporal data from environmental monitoring

When DASK might not be necessary:

Small datasets: Data that easily fits in memory and processes quickly with standard libraries

Simple operations: Basic data analysis that runs quickly without optimization

I/O bound tasks: Operations primarily limited by disk or network speed

Complex dependencies: Tasks with intricate dependencies between operations may not benefit from parallelization

Common use cases with Copernicus Data on WEkEO:

  • Processing large satellite image archives

  • Climate data analysis with multi-dimensional arrays (using xarray + DASK)

  • Time-series analysis of environmental data

  • Large-scale geospatial computations

  • Machine learning on large datasets

💡WEkEO Pro Tip: the WEkEO Workspace DASK Gateway is a good starting point to make your code DASK-ready before moving to larger cloud systems to scale up data processing.

Connecting to the WEkEO DASK Gateway


Access the WEkEO JupyterHub workspace

To use the WEkEO Dask Gateway, you first need access to the WEkEO JupyterHub workspace, which provides an interactive Python environment (Jupyter notebooks) directly in the cloud.

⚠️ You must have a WEkEO account to log in and access the workspace

Download the notebook

This article is also available as a ready-to-use Jupyter notebook, so you can follow the steps directly in your workspace.

Import required libraries

In the WEkEO JupyterHub workspace, create a notebook in your personal folder, then import the necessary libraries for working with the DASK Gateway:

# Import DASK Gateway libraries
from dask_gateway import Gateway, GatewayCluster

# Import DASK and related libraries
import dask.array as da
import xarray as xr
import numpy as np

# For displaying cluster information
from IPython.display import display

Establish a connection

Create a connection to the WEkEO DASK Gateway:

# Create a gateway connection
gateway = Gateway()

print("Successfully connected to the WEkEO DASK Gateway!")
print(f"Gateway address: {gateway.address}")

Configuring and creating a cluster


Configure cluster options

Before creating a cluster, configure the resources for your workers:

# Get the available cluster options
options = gateway.cluster_options()

# Display the options interface (in Jupyter, this shows a widget)
display(options)

# Set worker resources
options.worker_cores = 1 # 1 CPU core per worker
options.worker_memory = 1.0 # 1 GB RAM per worker

print(f"Cluster configured with:")
print(f" - Worker cores: {options.worker_cores}")
print(f" - Worker memory: {options.worker_memory} GB")

Current limitations per Cluster:

  • Number of cores: 2

  • Memory: 2.00 GiB

  • Number of workers: 3

Create and scale your cluster

Now create a cluster with your specified configuration:

# Create a new cluster
cluster = GatewayCluster(cluster_options=options)

# Scale the cluster to 2 workers
cluster.scale(2)

print(f"Cluster created!")
print(f"Cluster name: {cluster.name}")

# Display the cluster (shows a dashboard link and status)
display(cluster)

📌Note: The cluster dashboard provides a real-time view of your DASK cluster's performance. You can monitor task progress, memory usage, worker status, and execution timelines.

Connect a client to the cluster

To use the cluster, create a client that connects your notebook to it:

# Get a client connected to the cluster
client = cluster.get_client()

print("Client connected to cluster!")
print(f"Dashboard link: {client.dashboard_link}")

# Display client information
display(client)

💡WEkEO Pro Tip: click the dashboard link to open it in a new tab and monitor your cluster's performance in real-time!

Your first parallel computation


Create a DASK array

Create a large 3D array representing a dataset similar to Earth observation time-series:

# Define dimensions (time, latitude, longitude)
time_steps = 365 # One year of daily data
lat_points = 500 # 500 latitude points
lon_points = 500 # 500 longitude points

# Create a large random array with DASK
# The 'chunks' parameter defines how the array is divided for parallel processing
large_array = da.random.random(
(time_steps, lat_points, lon_points),
chunks=(50, 100, 100) # Process in smaller blocks
)

print(f"Array shape: {large_array.shape}")
print(f"Array size in memory: {large_array.nbytes / 1e9:.2f} GB")
print(f"Chunk size: {large_array.chunksize}")
print(f"Number of chunks: {large_array.npartitions}")

📌Note: Creating the array is instant because DASK uses lazy evaluation - the array hasn't actually been created yet, just the recipe for creating it.

Perform a computation

Calculate the mean across the time dimension, distributed across workers:

# Calculate the mean over the time dimension (lazy operation) mean_over_time = large_array.mean(axis=0)  

print("Mean calculation defined (lazy operation)")
print(f"Result shape will be: {mean_over_time.shape}")

# Compute the result (triggers parallel execution)
result = mean_over_time.compute()

print(f"Computation complete!")
print(f"Result shape: {result.shape}")
print(f"Result sample value: {result[0, 0]:.6f}")

📌Note: watch the DASK dashboard while the computation runs to see tasks being distributed, memory usage, and progress bars!

Working with xarray and DASK

DASK integrates seamlessly with xarray for Earth observation data:

# Create coordinate arrays
time = np.arange(0, 365)
lat = np.linspace(-90, 90, 500)
lon = np.linspace(-180, 180, 500)

# Create a DASK-backed xarray DataArray
data_dask = da.random.random((365, 500, 500), chunks=(50, 100, 100))

ds = xr.DataArray(
data_dask,
dims=("time", "lat", "lon"),
coords={"time": time, "lat": lat, "lon": lon},
name="temperature"
)

print("Created xarray DataArray with DASK backend:")
display(ds)

# Perform a realistic analysis: select a region and compute statistics
# Select a region (e.g., Europe)
europe = ds.sel(lat=slice(35, 70), lon=slice(-10, 40))

# Calculate temporal mean and standard deviation
temporal_mean = europe.mean(dim="time").compute()
temporal_std = europe.std(dim="time").compute()

print("Regional statistics computed!")
print(f"Mean value range: {float(temporal_mean.min()):.4f} to {float(temporal_mean.max()):.4f}")
print(f"Std dev range: {float(temporal_std.min()):.4f} to {float(temporal_std.max()):.4f}")

Managing your DASK cluster


Scaling your cluster

Dynamically adjust the number of workers based on your computational needs:

# Scale up to 3 workers
print("Scaling cluster to 3 workers...")
cluster.scale(3)

# Wait a moment for workers to start
import time
time.sleep(5)

# Check current worker count
print(f"Current workers: {len(client.scheduler_info()['workers'])}")

Monitoring cluster resources

Check the status of your cluster:

# Get information about the scheduler and workers
scheduler_info = client.scheduler_info()

print(f"Cluster status:")
print(f" Total workers: {len(scheduler_info['workers'])}")
print(f" Total cores: {sum(w['nthreads'] for w in scheduler_info['workers'].values())}")
print(f" Total memory: {sum(w['memory_limit'] for w in scheduler_info['workers'].values()) / 1e9:.2f} GB")

print(f"\nWorker details:")
for worker_id, worker_info in list(scheduler_info['workers'].items())[:2]: # Show first 2
print(f" {worker_id}:")
print(f" Cores: {worker_info['nthreads']}")
print(f" Memory: {worker_info['memory_limit'] / 1e9:.2f} GB")

Closing your cluster

When finished, always close your cluster to free up resources:

# Close the client connection
client.close()
print("Client closed")

# Shutdown the cluster
cluster.close()
print("Cluster shutdown complete")

⚠️ Leaving clusters running unnecessarily consumes resources and may impact other users. Always clean up when done!

Best practices


  1. Start small: Begin with a small cluster and scale up if needed

  2. Monitor performance: Use the dashboard to identify bottlenecks

  3. Choose appropriate chunks: Aim for chunk sizes of 100-500 MB for optimal performance

  4. Use lazy evaluation: Build your computation pipeline before calling .compute()

  5. Close resources: Always shut down your cluster when finished

What's next?


Additional resources can be found in our Help Center. Should you require further assistance or wish to provide feedback, feel free to contact us through a chat session available in the bottom right corner of the page.

Did this answer your question?