Parallelized Cloud Computing with Dask for Geoscientists#

Tutorial by Ellianna Abrahams

The following notebook includes materials from notebooks written by Hamed Alemohammad, Ryan Abernathy, Planetary Computer, stackstac, and Dask.

For those that would like to follow along for the introductory section, please install graphviz.

conda install python-graphviz

An Introduction to Dask for Parallel Computing#

Dask is a Python library designed to facilitate parallel computing and distributed data processing. It is particularly useful for handling computations that exceed the available memory of a single machine and for parallelizing tasks across multiple cores or even distributed clusters of machines. The primary goal of Dask is to provide a user-friendly and Pythonic interface for developers to scale their data analysis and processing workflows.

Dask consists of two main components:

  1. “Big Data” Collections: Dask provides collections that extend common Python data structures, such as arrays, dataframes, and lists, to handle larger-than-memory or distributed datasets. These collections are designed to mimic the interfaces of popular libraries like NumPy and Pandas, making it easier for developers to transition from single-machine to distributed computing environments.

  2. Dynamic Task Scheduling: Dask provides scheduling that manages the distribution and execution of tasks across the available computing resources. It ensures that computations are carried out efficiently in a parallelized manner.

Dask Distributed System High level collections are used to generate task graphs which can be executed by schedulers on a single machine or a cluster.

Some reasons why you should consider using Dask for your large data:

  • Scalable Parallel Computing: Dask enables the parallelization of computations across multiple cores and distributed clusters, allowing users to efficiently handle data processing tasks that exceed the memory capacity of a single machine.

  • Lazy Evaluation: Dask adopts a lazy evaluation approach, deferring the execution of operations until explicitly requested. This optimizes the scheduling and execution of computations, enhancing overall efficiency.

  • Flexibility Across Data Structures: Dask provides “Big Data” collections that extend common Python data structures (e.g., arrays, dataframes, bags) to handle larger-than-memory or distributed datasets. This flexibility allows users to seamlessly transition between single-machine and distributed computing environments.

  • Integration with Distributed Computing: Dask seamlessly integrates with a task scheduler, facilitating the scaling of computations from a single machine to distributed computing clusters. This adaptability makes Dask a versatile tool for handling diverse big data processing scenarios.

In essence, Dask offers a versatile and user-friendly framework for scalable, parallel, and distributed computing in Python, making it well-suited for a wide range of data analysis and processing tasks.



Dask Arrays#

Today, we’ll be focusing on processing image data, i.e. array data, with dask, but it can also be used with most any familiar python data variables, like dataframes (pandas-like) for example.

A dask array is very similar to a numpy or xarray array. However, a dask array is only symbolic, and doesn’t initially hold any data. It is structured to represent the shape, chunks, and computations needed to generate the data. Dask remains efficient by delaying computations until the full computation thread is built up and the data is requested. This mode of operation is called “lazy evaluation” and this mode allows users to build up a symbolic thread of calculations before turning them over the scheduler for execution.

dask arrays arrange many python arrays into a grid. The underlying arrays are stored either locally, or on a remote machine like the cloud.

Visualization of a Dask Array

Let’s compare some numpy arrays to dask arrays to illustrate further how Dask operates. We’ll begin by creating a numpy array of ones.

import numpy as np
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

Let’s look at the size of this array in memory:

print(f'Size in MB: {ones_np.nbytes / (1024 * 1024)}')
Size in MB: 30.517578125

Now, we’ll create the same array in dask.

import dask.array as da
ones = da.ones(shape)
ones
Array Chunk
Bytes 30.52 MiB 30.52 MiB
Shape (1000, 4000) (1000, 4000)
Dask graph 1 chunks in 1 graph layer
Data type float64 numpy.ndarray
4000 1000

Notice how this array comes with a dask visualization of our array and includes the word “chunks.” Chunks are the way that dask splits the array into sub-arrays. We did not specify how we wanted to chunk the array, so dask just used one chunk for the whole array. At this point, the dask array is very similar to the numpy array and isn’t structured to take advantage of dask functionality.

Specifying Chunks#

To leverage the full power of dask, we’ll split our array into specified chunks. The Dask documentation provides several ways to specify chunks. In this demonstration, we’ll use a block shape.

chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones
Array Chunk
Bytes 30.52 MiB 7.63 MiB
Shape (1000, 4000) (1000, 1000)
Dask graph 4 chunks in 1 graph layer
Data type float64 numpy.ndarray
4000 1000

At this point, all we have is a symbolic represetnation of the array, including its shape, data type, and chunk size. This is an example of lazy evalution; unlike numpy, dask has not generated data yet.

In order to generate data, we need to send the data collection to the scheduler, which we can do by calling the compute function.

ones.compute()
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

Dask has a nifty visualize function that illustrates the symbolic operations that ran when we called compute.

ones.visualize()
../_images/dask_for_geoscientists_21_0.png

We can see that we stored four “ones-like” arrays, one in each chunk. To generate the data, the dask scheduler calls np.ones four times and then concatenates all of them together into one array.

Rather than immediately loading a dask array (which will store all of the data in RAM), we usually would like to do some kind of analysis on the data and don’t necessarily want to store the full raw data in memory. Let’s see how dask handles a more complex computation that reduces the data.

complex_calculation = (ones * ones[::-1, ::-1]).mean()
complex_calculation.visualize()
../_images/dask_for_geoscientists_24_0.png

We can see Dask’s strategy for running the calculation thread on our data array. The power of Dask is that it automatically designs an algorithm appropriate for complex operation threads with big data.


Monitor Dask Usage via Dask Dashboard#

Dask offers visualization tools for you to monitor how dask is using memory across your cluster of CPUs and GPUs. This is important when working with large data, because as we’ll see later in this tutorial certain computations can actually add memory usage, potentially exceeding your memory amount while a task is being run or doubling up on computation time making them better to complete outside of dask.

Let’s take a quick look at the dask dashboard which is available to us in CryoCloud.

from dask.distributed import Client
client = Client()  # start distributed scheduler locally.
client

Client

Client-7e109575-977d-11ee-8e9b-c67ecc8c5715

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: /user/elliesch/proxy/8787/status

Cluster Info

On the left of our screen we’re going to open the dask dashboard. We’ll copy the path listed above in the client into the dashboard to connect our CryoCloud instance to a dashboard. Once the dashboard is up and running, please click on “Task Stream,” “Progress,” “Workers Memory,” and “Graph.”

For the sake of time today, we’ll just look at a couple of the assessment tools available to us in the dask dashboard, but the dask documentation with a more extensive list on what diagnostics are available to you through the dashboard.


Dask For Geoscientific Images#

For this section of the tutorial, we’re going to move into using dask with data types that are of interest to us in the Cryosphere. We’ll work with Landsat data and use dask’s native functionality to stream this data in directly from the cloud while only bringing in the pixels that we are interested in. Once we’ve selected our data of interest, we’ll use machine learning tools that interact with dask to do a basic image segmentation task.

Learning Goals:#

  • Interacting with a Large Image using Dask

    • Query a Landsat time-series using stackstac with Planetary Computer

    • Refine the image query before downloading data

    • Import the refined image

  • Running Custom Functions on Dask + xarray

    • Using a custom function for analysis before importing data

    • Implementing scikit image on dask arrays

Computing Environment#

We will set up our computing environment with package imports. Today we’ll be working with planetary computer, dask, xarray, and scikit image.

Tip: If you need to import a library that is not pre-installed, use pip install <package-name> in a Jupyter notebook cell alone to install it for this instance of CryoCloud. On CryoCloud, your package installation will not persist between logins.

pip install planetary-computer
Collecting planetary-computer
  Using cached planetary_computer-1.0.0-py3-none-any.whl (14 kB)
Requirement already satisfied: click>=7.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (8.1.7)
Requirement already satisfied: pydantic>=1.7.3 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (2.5.2)
Requirement already satisfied: requests>=2.25.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (2.31.0)
Requirement already satisfied: pystac>=1.0.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (1.9.0)
Requirement already satisfied: pytz>=2020.5 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (2023.3.post1)
Requirement already satisfied: packaging in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (23.2)
Requirement already satisfied: pystac-client>=0.2.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from planetary-computer) (0.5.1)
Collecting python-dotenv
  Using cached python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Requirement already satisfied: pydantic-core==2.14.5 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pydantic>=1.7.3->planetary-computer) (2.14.5)
Requirement already satisfied: typing-extensions>=4.6.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pydantic>=1.7.3->planetary-computer) (4.8.0)
Requirement already satisfied: annotated-types>=0.4.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pydantic>=1.7.3->planetary-computer) (0.6.0)
Requirement already satisfied: python-dateutil>=2.7.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pystac>=1.0.0->planetary-computer) (2.8.2)
Requirement already satisfied: urllib3<3,>=1.21.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from requests>=2.25.1->planetary-computer) (1.26.18)
Requirement already satisfied: idna<4,>=2.5 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from requests>=2.25.1->planetary-computer) (3.4)
Requirement already satisfied: charset-normalizer<4,>=2 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from requests>=2.25.1->planetary-computer) (3.3.0)
Requirement already satisfied: certifi>=2017.4.17 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from requests>=2.25.1->planetary-computer) (2023.11.17)
Requirement already satisfied: six>=1.5 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from python-dateutil>=2.7.0->pystac>=1.0.0->planetary-computer) (1.16.0)
Installing collected packages: python-dotenv, planetary-computer
Successfully installed planetary-computer-1.0.0 python-dotenv-1.0.0
Note: you may need to restart the kernel to use updated packages.
pip install stackstac
Collecting stackstac
  Using cached stackstac-0.5.0-py3-none-any.whl (63 kB)
Requirement already satisfied: pyproj<4.0.0,>=3.0.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from stackstac) (3.6.1)
Requirement already satisfied: dask[array]>=2022.1.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from stackstac) (2022.11.0)
Requirement already satisfied: xarray>=0.18 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from stackstac) (2023.12.0)
Requirement already satisfied: rasterio<2.0.0,>=1.3.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from stackstac) (1.3.8)
Requirement already satisfied: packaging>=20.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (23.2)
Requirement already satisfied: partd>=0.3.10 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (1.4.1)
Requirement already satisfied: cloudpickle>=1.1.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (3.0.0)
Requirement already satisfied: pyyaml>=5.3.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (5.4.1)
Requirement already satisfied: toolz>=0.8.2 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (0.12.0)
Requirement already satisfied: click>=7.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (8.1.7)
Requirement already satisfied: fsspec>=0.6.0 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (2023.6.0)
Requirement already satisfied: numpy>=1.18 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from dask[array]>=2022.1.1->stackstac) (1.23.5)
Requirement already satisfied: certifi in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pyproj<4.0.0,>=3.0.0->stackstac) (2023.11.17)
Requirement already satisfied: click-plugins in /srv/conda/envs/notebook/lib/python3.10/site-packages (from rasterio<2.0.0,>=1.3.0->stackstac) (1.1.1)
Requirement already satisfied: snuggs>=1.4.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from rasterio<2.0.0,>=1.3.0->stackstac) (1.4.7)
Requirement already satisfied: setuptools in /srv/conda/envs/notebook/lib/python3.10/site-packages (from rasterio<2.0.0,>=1.3.0->stackstac) (68.2.2)
Requirement already satisfied: attrs in /srv/conda/envs/notebook/lib/python3.10/site-packages (from rasterio<2.0.0,>=1.3.0->stackstac) (23.1.0)
Requirement already satisfied: affine in /srv/conda/envs/notebook/lib/python3.10/site-packages (from rasterio<2.0.0,>=1.3.0->stackstac) (2.4.0)
Requirement already satisfied: cligj>=0.5 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from rasterio<2.0.0,>=1.3.0->stackstac) (0.7.2)
Requirement already satisfied: pandas>=1.4 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from xarray>=0.18->stackstac) (1.5.1)
Requirement already satisfied: python-dateutil>=2.8.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pandas>=1.4->xarray>=0.18->stackstac) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from pandas>=1.4->xarray>=0.18->stackstac) (2023.3.post1)
Requirement already satisfied: locket in /srv/conda/envs/notebook/lib/python3.10/site-packages (from partd>=0.3.10->dask[array]>=2022.1.1->stackstac) (1.0.0)
Requirement already satisfied: pyparsing>=2.1.6 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from snuggs>=1.4.1->rasterio<2.0.0,>=1.3.0->stackstac) (3.1.1)
Requirement already satisfied: six>=1.5 in /srv/conda/envs/notebook/lib/python3.10/site-packages (from python-dateutil>=2.8.1->pandas>=1.4->xarray>=0.18->stackstac) (1.16.0)
Installing collected packages: stackstac
Successfully installed stackstac-0.5.0
Note: you may need to restart the kernel to use updated packages.
import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
import os

import pystac_client
from pystac.extensions.projection import ProjectionExtension as proj

import planetary_computer
import rasterio
import rasterio.features
import stackstac
import pyproj

import dask.diagnostics

Interacting with a Large Image Using Dask#

Data Import using Planetary Computer and Stackstac#

The Planetary Computer package allows users to stream data stored in Microsoft’s Planetary Computer Catalog directly from the cloud. When combined with stackstac tools, this allows users to query an area of interest and planetary computer will stream all the requested data from that area using dask.

In this tutorial, we are using Planetary Computer to interact with images from Landsat. In particular, we’ll query images that are around the San Francisco Bay, where this tutorial is taking place. To initialize planetary computer, we’ll define a catalog variable and define the coordinate bounds of our image query using rasterio.

catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)
area_of_interest = {
    "type": "Polygon",
    "coordinates": [
        [
            [-124.031297, 36.473972],
            [-120.831297, 36.473972],
            [-120.831297, 39.073972],
            [-124.031297, 39.073972],
            [-124.031297, 36.473972],
        ]
    ],
}

bounds_latlon = rasterio.features.bounds(area_of_interest)

Now we’ll define a time range to search over and the Landsat bands that we are interested in. In our query we limit the data to images that have cloud cover (eo:cloud_cover) less than (lt) 20%.

time_range = '2020-01-31/2021-01-31'
bbox = bounds_latlon

band_names = ['red','green','blue', 'nir08', 'swir16']

search = catalog.search(collections=["landsat-c2-l2"], bbox=bbox, 
                        datetime=time_range, 
                        query={'eo:cloud_cover': {'lt': 20}
                              })
items = search.get_all_items()

print(f'There are {len(items)} images that fit your search criteria in the database.')
There are 183 images that fit your search criteria in the database.
print(type(items))
<class 'pystac.item_collection.ItemCollection'>

Notice how items has created a pystac item collection of all the relevant image slices from Landsat that are located within our coordinate bounds. Now we’ll use stackstac to turn these into a Dask xarray that we can interact with.

item = items[0]
epsg = proj.ext(item).epsg

stack = stackstac.stack(items, epsg=epsg, assets=['red','green','blue', 'nir08', 'swir16'],
                        bounds_latlon=bounds_latlon, resolution=30)

stack
<xarray.DataArray 'stackstac-6f74f18ec9af45035c661e3ac7f51d1b' (time: 183,
                                                                band: 5,
                                                                y: 9674, x: 9558)>
dask.array<fetch_raster_window, shape=(183, 5, 9674, 9558), dtype=float64, chunksize=(1, 1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/27)
  * time                         (time) datetime64[ns] 2020-02-06T18:51:55.12...
    id                           (time) <U31 'LC08_L2SP_045033_20200206_02_T1...
  * band                         (band) <U6 'red' 'green' ... 'nir08' 'swir16'
  * x                            (x) float64 4.076e+05 4.076e+05 ... 6.943e+05
  * y                            (y) float64 4.327e+06 4.327e+06 ... 4.037e+06
    landsat:wrs_type             <U1 '2'
    ...                           ...
    view:off_nadir               int64 0
    landsat:cloud_cover_land     (time) float64 0.19 0.0 0.94 ... 3.0 0.0 5.41
    proj:epsg                    int64 32610
    raster:bands                 object {'scale': 2.75e-05, 'nodata': 0, 'off...
    title                        (band) <U28 'Red Band' ... 'Short-wave Infra...
    epsg                         int64 32610
Attributes:
    spec:        RasterSpec(epsg=32610, bounds=(407580, 4037010, 694320, 4327...
    crs:         epsg:32610
    transform:   | 30.00, 0.00, 407580.00|\n| 0.00,-30.00, 4327230.00|\n| 0.0...
    resolution:  30

That was fast for a large image cutout! Let’s figure out why. Take a look at the bottom of your CryoCloud Screen and check your memory. You’ll notice that it should be on the order of MBs at this point, and this is because we haven’t yet pulled any data into our CryoCloud environment. Instead we have the directions to the data locations for all of the Landsat images in Planetary Computer, ready to stream into an initialized xarray.

Under the hood, stackstac is using dask to do this. Currently we have the “lazy” load we discussed above implemented. dask allows you to perform parallel and distributed computing on larger-than-memory datasets, but this can be inefficient. Lazy evaluation delays the execution of operations, here that would be pulling in the data to your CryoCloud instance, until the results are actually needed. This approach enables dask to efficiently handle large computations. What we see in the printout above is a symbolic representation of the xarray dimensions for our data.

Why is this evaluation using any memory at all? Check above and see that the table describing your xarray has a row named “Dask Graph.” There we see that our data would be divided into chunks and graph layers. As part of the lazy load, dask has initialized these chunks as empty variables and that is what is currently using memory.

How big would this dataset be if we held it all in memory?

print('projected dataset size (GB): ', stack.nbytes/1e9)
projected dataset size (GB):  676.83715344

Refining our Image Before Data Import#

We know that we have a time-series of 696 images, but perhaps we’re actually interested in behavior averaged over monthly timescales in this region and we only want an rgb image. We can manipulate our stackstac array just like we would with a regular xarray.

rgb = stack.sel(band=['red', 'green', 'blue'])
monthly = rgb.resample(time="MS").median("time", keep_attrs=True)
/srv/conda/envs/notebook/lib/python3.10/site-packages/dask/array/core.py:1760: PerformanceWarning: Increasing number of chunks by factor of 17
  return da_func(*args, **kwargs)

Notice how we got a warning that dask is automatically adjusting the chunk size for our xarray object. In dask, the chunk size is a critical parameter that determines how the data is divided into smaller, manageable pieces for distributed processing. When you create or manipulate a dask enabled xarray dataset, dask tries to find an appropriate chunk size based on the size of the data and the available computing resources. This adjustment is made to optimize computational performance once we finally compute our evaluation. For now, we are still in a lazy state, which you can see evidenced by how little memory we are taking up.

We have queried for a rather large image size, and if we were to pull all of that down to our local machine, we might overwhelm the amount of memory available to us on our current CryoCloud instance. Luckily, we can keep using our dask-enabled xarray to further refine our query before pulling the data in. Let’s reproject our data to UTM coordinates so we can pick a specific area of interest based on a length of distance in the x/y plane.

lon, lat = -122.4312, 37.7739

x_utm, y_utm = pyproj.Proj(monthly.crs)(lon, lat)
buffer = 6000  # meters
area_of_interest = monthly.loc[..., y_utm+buffer:y_utm-buffer, x_utm-buffer:x_utm+buffer]
area_of_interest
<xarray.DataArray 'stackstac-6f74f18ec9af45035c661e3ac7f51d1b' (time: 12,
                                                                band: 3,
                                                                y: 400, x: 400)>
dask.array<getitem, shape=(12, 3, 400, 400), dtype=float64, chunksize=(1, 1, 286, 376), chunktype=numpy.ndarray>
Coordinates: (12/15)
  * band                         (band) <U6 'red' 'green' 'blue'
  * x                            (x) float64 5.441e+05 5.441e+05 ... 5.561e+05
  * y                            (y) float64 4.187e+06 4.187e+06 ... 4.175e+06
    landsat:wrs_type             <U1 '2'
    description                  <U28 'Landsat Collection 2 Level-2'
    landsat:collection_number    <U2 '02'
    ...                           ...
    view:off_nadir               int64 0
    proj:epsg                    int64 32610
    raster:bands                 object {'scale': 2.75e-05, 'nodata': 0, 'off...
    title                        (band) <U28 'Red Band' 'Green Band' 'Blue Band'
    epsg                         int64 32610
  * time                         (time) datetime64[ns] 2020-02-01 ... 2021-01-01
Attributes:
    spec:        RasterSpec(epsg=32610, bounds=(407580, 4037010, 694320, 4327...
    crs:         epsg:32610
    transform:   | 30.00, 0.00, 407580.00|\n| 0.00,-30.00, 4327230.00|\n| 0.0...
    resolution:  30

Import the Refined Image#

Now that we’ve refined our query, without overloading the memory, we can import the refined time-series. This will allow us to do further calculations with it on our CryoCloud instance, or even just allow us to visualize it. Let’s see what we’ve been working with!

Even though we visualized pathways for our toy array above using the built-in visualize function, we don’t recommend running it here, as this will take too long to load.

The compute function in dask triggers the actual data pull. We can monitor the time that it takes using the dask progress bar. While this is processing watch the memory at the bottom of the screen.

with dask.diagnostics.ProgressBar():
    data = area_of_interest.compute()
/srv/conda/envs/notebook/lib/python3.10/site-packages/numpy/lib/nanfunctions.py:1217: RuntimeWarning: All-NaN slice encountered
  r, k = function_base._ureduce(a, func=_nanmedian, axis=axis, out=out,
/srv/conda/envs/notebook/lib/python3.10/site-packages/numpy/lib/nanfunctions.py:1217: RuntimeWarning: All-NaN slice encountered
  r, k = function_base._ureduce(a, func=_nanmedian, axis=axis, out=out,
/srv/conda/envs/notebook/lib/python3.10/site-packages/numpy/lib/nanfunctions.py:1217: RuntimeWarning: All-NaN slice encountered
  r, k = function_base._ureduce(a, func=_nanmedian, axis=axis, out=out,
/srv/conda/envs/notebook/lib/python3.10/site-packages/numpy/lib/nanfunctions.py:1217: RuntimeWarning: All-NaN slice encountered
  r, k = function_base._ureduce(a, func=_nanmedian, axis=axis, out=out,

Notice how the memory fluctuated, and at times was much higher than the final memory. dask still needed to bring all the data into our cloud instance, but only needed to hold the data from our initial query in memory as long as it was needed to do our calculations. We are now left with a much smaller slice than our initial query, but the slice that we wanted, which a monthly-averaged Landsat time series of a specific area on the map. Now that the data that we wanted all along is in our CryoCloud RAM, let’s visualize it.

First we’ll take a look at data. It’s still an xarray, but we can see that the data is present, and it is no longer held in chunks. Since we know that Landsat doesn’t pass over this location every month, we’ll eliminate the time slices that have no data in them.

data = data.dropna(dim='time', how='all')
data
<xarray.DataArray 'stackstac-6f74f18ec9af45035c661e3ac7f51d1b' (time: 7,
                                                                band: 3,
                                                                y: 400, x: 400)>
array([[[[0.1191375 , 0.0971925 , 0.1134175 , ..., 0.0223925 ,
          0.0194775 , 0.0163975 ],
         [0.076595  , 0.0718375 , 0.08809   , ..., 0.0228875 ,
          0.020715  , 0.0174975 ],
         [0.067025  , 0.0672175 , 0.0660075 , ..., 0.022475  ,
          0.02154   , 0.019175  ],
         ...,
         [0.0282775 , 0.034025  , 0.035895  , ..., 0.106845  ,
          0.1058    , 0.129725  ],
         [0.0482425 , 0.036665  , 0.03529   , ..., 0.1178175 ,
          0.10767   , 0.109265  ],
         [0.068125  , 0.045575  , 0.0360325 , ..., 0.1108325 ,
          0.0946075 , 0.070105  ]],

        [[0.10855   , 0.095955  , 0.1162775 , ..., 0.0500575 ,
          0.0477475 , 0.0460425 ],
         [0.076265  , 0.0768975 , 0.0904    , ..., 0.0495625 ,
          0.04871   , 0.0460975 ],
         [0.0695    , 0.0739275 , 0.0703525 , ..., 0.0491775 ,
          0.0490125 , 0.0480225 ],
...
         [0.03978625, 0.05498   , 0.04838   , ..., 0.1190275 ,
          0.114765  , 0.1190275 ],
         [0.0441725 , 0.047445  , 0.04572625, ..., 0.0853675 ,
          0.1276075 , 0.1109975 ],
         [0.056135  , 0.044585  , 0.0465375 , ..., 0.08378625,
          0.0921875 , 0.07361125]],

        [[0.062075  , 0.0539075 , 0.0497825 , ..., 0.04431   ,
          0.04431   , 0.0409275 ],
         [0.0552275 , 0.04827   , 0.054705  , ..., 0.0489025 ,
          0.04431   , 0.0409275 ],
         [0.0515425 , 0.049425  , 0.0492875 , ..., 0.0489025 ,
          0.0400475 , 0.0409275 ],
         ...,
         [0.02517   , 0.02801625, 0.0305875 , ..., 0.1034625 ,
          0.1034625 , 0.1067075 ],
         [0.0311925 , 0.026105  , 0.02826375, ..., 0.07764   ,
          0.1067075 , 0.09953   ],
         [0.039305  , 0.02561   , 0.02517   , ..., 0.07759875,
          0.085285  , 0.05994375]]]])
Coordinates: (12/15)
  * band                         (band) <U6 'red' 'green' 'blue'
  * x                            (x) float64 5.441e+05 5.441e+05 ... 5.561e+05
  * y                            (y) float64 4.187e+06 4.187e+06 ... 4.175e+06
    landsat:wrs_type             <U1 '2'
    description                  <U28 'Landsat Collection 2 Level-2'
    landsat:collection_number    <U2 '02'
    ...                           ...
    view:off_nadir               int64 0
    proj:epsg                    int64 32610
    raster:bands                 object {'scale': 2.75e-05, 'nodata': 0, 'off...
    title                        (band) <U28 'Red Band' 'Green Band' 'Blue Band'
    epsg                         int64 32610
  * time                         (time) datetime64[ns] 2020-02-01 ... 2020-12-01
Attributes:
    spec:        RasterSpec(epsg=32610, bounds=(407580, 4037010, 694320, 4327...
    crs:         epsg:32610
    transform:   | 30.00, 0.00, 407580.00|\n| 0.00,-30.00, 4327230.00|\n| 0.0...
    resolution:  30

We’ll use xarray’s native plotting function to visualize the data.

data.plot.imshow(row='time', rgb='band', robust=True, size=6);
../_images/dask_for_geoscientists_60_0.png

Running Complex Python Functions with Dask + Xarray#

In the previous section we showed how dask can be used to pull data from the cloud while refining your search ahead of time to the data and resolutions that you are interested in. In this section we’ll see how dask can be used to do some more complex analyses as well before pulling the data into memory.

dask does have a native package for interacting with images called dask_image, but it can be tricky to use with streaming data in chunks, which makes it difficult to use on large images stored elsewhere in the cloud. We will instead write some custom functions to be used on our dask data, so that we can import our data already transformed.

Implementing Custom Functions with Dask#

Let’s start by looking at the normalized difference vegetation index (NDVI). In the cryosphere, this isn’t likely to be an index that we use often, but it’s a great stand in for how we can create custom functions in dask. This same principal could be applied to pull in our data as band ratios, or other preprocessing functions that we might be interested in. Similarly to above, we’ll limit this to our area of interest.

area_of_interest = stack.loc[..., y_utm+buffer:y_utm-buffer, x_utm-buffer:x_utm+buffer]
area_of_interest
<xarray.DataArray 'stackstac-6f74f18ec9af45035c661e3ac7f51d1b' (time: 183,
                                                                band: 5,
                                                                y: 400, x: 400)>
dask.array<getitem, shape=(183, 5, 400, 400), dtype=float64, chunksize=(1, 1, 400, 400), chunktype=numpy.ndarray>
Coordinates: (12/27)
  * time                         (time) datetime64[ns] 2020-02-06T18:51:55.12...
    id                           (time) <U31 'LC08_L2SP_045033_20200206_02_T1...
  * band                         (band) <U6 'red' 'green' ... 'nir08' 'swir16'
  * x                            (x) float64 5.441e+05 5.441e+05 ... 5.561e+05
  * y                            (y) float64 4.187e+06 4.187e+06 ... 4.175e+06
    landsat:wrs_type             <U1 '2'
    ...                           ...
    view:off_nadir               int64 0
    landsat:cloud_cover_land     (time) float64 0.19 0.0 0.94 ... 3.0 0.0 5.41
    proj:epsg                    int64 32610
    raster:bands                 object {'scale': 2.75e-05, 'nodata': 0, 'off...
    title                        (band) <U28 'Red Band' ... 'Short-wave Infra...
    epsg                         int64 32610
Attributes:
    spec:        RasterSpec(epsg=32610, bounds=(407580, 4037010, 694320, 4327...
    crs:         epsg:32610
    transform:   | 30.00, 0.00, 407580.00|\n| 0.00,-30.00, 4327230.00|\n| 0.0...
    resolution:  30
NDVI = (area_of_interest.sel(band='swir16') - 
        area_of_interest.sel(band='nir08')) / (area_of_interest.sel(band='swir16') + 
                                               area_of_interest.sel(band='nir08'))

Perhaps we are also interested in obtaining an average NDVI across the entire year. Like before we can write this calculation before computing so that we only keep the data that we’re interested persistant in memory. The persist function is an alternative to calling compute that triggers the computation and stores the results so that subsequent computations can reuse these stored values. This can be useful if we intend to repeat a calculation.

with dask.diagnostics.ProgressBar():
    av_NDVI = NDVI.mean(dim='time').persist()

We can visualize this using dask’s built in plotting functionality.

av_NDVI.plot.imshow(vmin=-0.3, vmax=0.3, cmap='RdBu_r')
<matplotlib.image.AxesImage at 0x7f02f0db2080>
../_images/dask_for_geoscientists_68_1.png

Operating Sci-kit Image with Dask for Quick Segmentation#

What if we want to do something more complicated, like something that requires packages from sklearn? We can also pair sklearn functionality with dask arrays. First we’ll take the yearly average of the luminance of our data.

def luminance(dask_arr):
    result = ((dask_arr.sel(band='red') * 0.2125) +
              (dask_arr.sel(band='green') * 0.7154) +
              (dask_arr.sel(band='blue') * 0.0721))
    return result
lum = luminance(area_of_interest)
av_lum = lum.median(dim='time')
av_lum
<xarray.DataArray 'stackstac-6f74f18ec9af45035c661e3ac7f51d1b' (y: 400, x: 400)>
dask.array<nanmedian, shape=(400, 400), dtype=float64, chunksize=(302, 302), chunktype=numpy.ndarray>
Coordinates: (12/14)
  * x                            (x) float64 5.441e+05 5.441e+05 ... 5.561e+05
  * y                            (y) float64 4.187e+06 4.187e+06 ... 4.175e+06
    landsat:wrs_type             <U1 '2'
    description                  <U28 'Landsat Collection 2 Level-2'
    landsat:collection_number    <U2 '02'
    landsat:correction           <U4 'L2SP'
    ...                           ...
    view:off_nadir               int64 0
    proj:epsg                    int64 32610
    raster:bands                 object {'scale': 2.75e-05, 'nodata': 0, 'off...
    epsg                         int64 32610
    band                         <U6 'blue'
    title                        <U28 'Blue Band'
av_lum.plot.imshow(cmap='Greys_r')
<matplotlib.image.AxesImage at 0x7f02f0598fd0>
../_images/dask_for_geoscientists_74_1.png

We still haven’t computed our data, so this is still operating in lazy load, which means that even though we had to bring the data into memory for our plot we didn’t hold it there, and so dask produced the image and then removed the data from memory.

We are about to switch over to using a function from another package, sci-kit image, and this function will require pulling the data into memory in order to run a more complex function. This means that it is important to have enough memory to allow for the function to store whatever copies of the data it needs while it is computing, but it will only save the output in memory.

from skimage.filters import gaussian
def apply_gaussian(arr):
    return gaussian(arr, sigma=(0, 0.1), preserve_range=True, mode='reflect')
smoothed = xr.apply_ufunc(apply_gaussian, av_lum,
                          dask='allowed', output_dtypes=[np.float64],
                          input_core_dims=[['x', 'y']], output_core_dims=[['x', 'y']])
np.mean(smoothed)
<xarray.DataArray 'stackstac-6f74f18ec9af45035c661e3ac7f51d1b' ()>
array(0.07944183)
Coordinates:
    landsat:wrs_type             <U1 '2'
    description                  <U28 'Landsat Collection 2 Level-2'
    landsat:collection_number    <U2 '02'
    landsat:correction           <U4 'L2SP'
    landsat:collection_category  <U2 'T1'
    gsd                          int64 30
    view:off_nadir               int64 0
    proj:epsg                    int64 32610
    raster:bands                 object {'scale': 2.75e-05, 'nodata': 0, 'off...
    epsg                         int64 32610
    band                         <U6 'blue'
    title                        <U28 'Blue Band'

Once we have run a gaussian filter on our data, we can run a simple threshold our images for a basic segmentation. This will not be as nice as segmentation done with more complex computer vision methods, but can be useful for finding easily delineated objects.

absolute_threshold = smoothed > 0.175
absolute_threshold.plot.imshow(cmap='Greys_r')
<matplotlib.image.AxesImage at 0x7f02f066d060>
../_images/dask_for_geoscientists_82_1.png

If we wanted to run machine learning models using sci-kit learn we could apply the same principle. The really beautiful thing about using dask for these tasks is that our initial dataset was estimated to be >600 GB, and we are able to do these relatively high level tasks while streaming in only 20.22 GB that we needed. If we needed to be careful, we could be even more specific about how we implement dask to ensure that we are incredibly efficient with our persistant memory usage.

For a deeper dive into using dask with an applied Cryosphere project, head to Jonny Kingslake’s tutorial on using ARCO data with zarr and dask.