Getting oriented in the RAPIDS distributed ML ecosystem, part 1: ETL

For a long time being a data scientist that worked with large datasets and/or models meant mastering two sets of tools, one for local work and one for "big data". pandas, numpy, and scikit-learn make it easy to do stuff on your local machine, but can’t handle anything too big to fit in RAM. Once data gets too big, or training too costly, you have to move on to a "big data" tool that pools the resources of several machines together to get the job done. This traditionally meant Apache Spark, which, though powerful, requires learning a brand new API and maybe even a brand new language (performance Spark code is written in Scala).

Enter Dask. Dask is a distributed ETL tool that’s tightly integrated into the Python data science ecosystem. Dask is extremely popular among data scientists because its core API is a subset of the pandas, numpy, and scikit-learn APIs. This flattens the learning curve considerably: most Pythonistas can be productive with Dask almost immediately.

As part of its RAPIDS initiative, NVIDIA is going one step further, partnering with the community to build an ecosystem for distributed data science on GPUs on top of Dask. Their new cudf Python package already boasts some pretty impressive results — like this one from Capital One Labs showing a log-scale speedup for an internal ETL job that was previously being run on CPU:

This blog post, the first of two exploring this emerging ecosystem, is an introduction to distributed ETL using the dask, cudf, and dask_cudf APIs. We build the following mental map of the ecosystem:

Note that this post assumes familiarity with the Python data science ecosystem.

Distributed ETL on CPU with Dask

ETL, aka extract-transform-load, is well-accepted terminology for the speculative, exploratory phase that precedes pretty much any data analysis, data science, or machine learning project. In data science, ETL is traditionally performed using the NumPy and Pandas libraries in a Jupyter notebook. These three tools form the backbone of the modern “data science stack”.

However these libraries were only ever intended to work with datasets small enough to fit in memory. A good rule of thumb for Pandas is that you need to have 5x the size of a dataset in RAM to be able to work with it. So if your machine has 16 GB of RAM, ~3 GB is around how much you comfortably "fit" onto your machine before you start to run into problems (you might be able to go slightly bigger if you’re careful about how you use memory).

To comfortably work with any dataset larger than that ("big data"), you need to shift up to a so-called cluster computing framework. Cluster computing frameworks work by distributing the work required among many machines: your RAM becomes the sum of all of the machines' RAM, your FLOPS the sum of all their FLOPS. The two cluster computing frameworks best known in the Python community are Apache Spark and Dask (see the JetBrains 2019 Developer Survey for some numbers).

If you are working on a greenfield machine learning project, I recommend choosing Dask over Spark for your cluster computing framework.

While the folks behind RAPIDS have indicated that they intend to support both Dask and Spark eventually, for a variety of reasons they have prioritized Dask as their target platform for now. If machine learning pipelines are your primary use case, you want to write maximally performant code, and you don’t have an existing Spark cluster tying you down, Dask is the way to go. :)

Let’s now take a quick look at an example of Dask in action. conda install -c conda-forge dask[dataframe] distributed scikit-learn and you can run the following example script:

from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
from sklearn.datasets import make_classification
# create a dask cluster client
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
# create an example dataset and transform it into a dask dataframe
X_train, y_train = make_classification(
    n_features=2, n_redundant=0, n_informative=2,
    random_state=1, n_clusters_per_class=1, n_samples=1000
)
X_train, y_train = dd.from_array(X_train), dd.from_array(y_train)
# compute a trivial sum
X_train.sum().compute()

This script creates a distributed Dask cluster on your local machine (despite the use of dask.distributed, all the nodes are running on your local machine, so it’s not truly distributed), reads some fake data into a dask.dataframe, and computes its sum.

The great thing about Dask is that its dataframe API is a comprehensive clone of the pandas API. Many of the functions you are familiar with from pandas continue to work the same way they did before. Under the hood, Dask partitions the dataset into memory blocks. Each block is a pandas DataFrame containing a disjoint chunk of the overall dataset. Here’s a diagram from the Dask documentation explaining how it works:

When you call the "Dask-ified" version of a DataFrame function, Dask has each machine in the cluster compute a partial result on the data blocks it has locally. These results are then sent back to the primary node, combined, and returned to the end user.

Of course, the pandas API is huge, and Dask does not attempt to implement every single DataFrame feature offered. Dask only provides a subset of the pandas API. The more popular methods are there, as are the easy-to-implement primitives.

The most up-to-date reference on what’s in and what’s not is the Dask DataFrame API Reference. All of the pandas mainstays are there: assign, apply, groupby, loc, iloc, resample, rolling, merge, join, astype. Even some more exotic functions, like melt and pipe, have been implemented.

To get your hands dirty with Dask yourself, I recommend checking out Dask’s SciPy 2020 Tutorial.

Distributed ETL on GPU

Now that we know how we would run distributed ETL jobs on CPU, let’s take a look at the GPU side of things.

NVIDIA's cudf library provides two related DataFrame APIs: a cudf.DataFrame for single GPUs, and a dask_cudf.DataFrame for ETL across GPUs. For installation instructions, refer to the RAPIDS Getting Started page — on my test machine conda install -c rapidsai -c nvidia -c conda-forge -c defaults cudf=0.14 python=3.7 cudatoolkit=10.1 did the trick. Here’s an example cudf call demonstrating the API:

import cudf
import dask_cudf
df = cudf.DataFrame(
    {'a': list(range(20)),
     'b': list(reversed(range(20))),
     'c': list(range(20))}
)
ddf = dask_cudf.from_cudf(df, npartitions=2)
print(ddf.loc[0:5].compute().compute())
a   b  c
0  0  19  0
1  1  18  1
2  2  17  2
3  3  16  3
4  4  15  4
5  5  14  5

Assuming you have a dask.distributed cluster up and running, a cudf.DataFrame can be sharded among GPUs by transforming it into a dask_cudf.DataFrame. The opposite is also true: a dask_cudf.DataFrame can be gathered back onto a single GPU. Compute operations return a DataFrame reduced onto one GPU by default: so if you run a compute a dask_cudf.DataFrame, expect to get a cudf.DataFrame back out.

As you might have noticed from the name, dask_cudf is tightly coupled to dask. Under the hood, Dask is still handling task scheduling and execution. The main difference is that the memory blocks it uses are now cudf.DataFrame objects, not pandas.DataFrame ones.

However, whilst cudf and dask both inherit from the pandas API, the subsets they implement are somewhat different. cudf is also a whole lot newer than Dask is, leading to a significantly smaller API. And dask_cudf , which is even newer, is even more limited in terms of the operations it can perform.

Take apply for example. This is one of the most important functions in pandas, but it’s not directly present in dask_cudf. Instead you use map_partitions, which takes a function as input and maps it over the raw memory blocks:

ddf.map_partitions(lambda df: df + 1).compute()

This RDD-like API is workable when you want to compute something row-wise on the raw dataframe, but really limits what you can do after a groupby or rolling operation. It’s also notable that appy_rows, being GPU-accelerated, accepts a more limited set of function signatures than the apply method in pandas or dask: the function must be JIT-able by numba.

For a deeper dive into cudf, I recommend checking out 10 Minutes to cudf and Dask-cuDF.

Benchmarks

In summary, dask is stable and feature-complete, cudf is bleeding-edge and incomplete, but developing rapidly.

The foibles of working on the bleeding edge are potentially very much worth it if the speedups from moving from CPU to GPU are big enough.

For a rule-of-thumb estimate on how much of a performance improvement dask_cudf is over dask alone, I used Spell to launch a basic computational job on three different instances:

  • cpu-huge, aka c5.18xlarge, one of the biggest CPU-only instances you can get on AWS
  • v100x4 (p3.8xlarge), a GPU server with four V100s (the most powerful cloud GPU currently available in AWS) onboard
  • t4x4 (g4dn.xlarge), a GPU server with four T4 GPUs onboard (the most recent and most cost-effective cloud GPU available — but less powerful than the V100)

The objective was to perform a value_counts() operation on the passenger_count column in a 16 GB dataset of taxi trips (to learn more about this dataset check out my previous blog post, "Getting started with large-scale ETL jobs using Dask and AWS EMR"). While this is not really a robust benchmark, it’s good enough to get a sense of the speedup that cudf can offer.

On a giant CPU instance, using dask, this operation (which requires scanning through the entire dataset) took just over 1 second to perform. First, reading the data into memory:

from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
cluster = LocalCluster()
client = Client(cluster)
df = dd.read_parquet('/mnt/data/2019-taxi-dataset/')

Then running the operation:

# cpu-huge
%%time
df.passenger_count.value_counts().compute()
CPU times: user 3.16 s, sys: 1.76 s, total: 4.92 s
Wall time: 1.08 s

Running this same computation on GPU using dask_cudf resulted in a 10x speedup.

We start by reading the data into GPU memory and partitioning it into four pieces (this happens to be the most efficient partitioning scheme for this particular combination of dataset and operation):

from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client, wait
cluster = LocalCUDACluster()
client = Client(cluster)
ddf = dask_cudf.read_parquet(f'/mnt/data/2019-taxi-dataset/')
ddf = ddf.repartition(npartitions=4)
ddf = ddf.persist()
wait(ddf)

We then run the code samples:

# t4x4
%%time
ddf.passenger_count.value_counts().compute()
CPU times: user 17.5 ms, sys: 39 µs, total: 17.6 ms
Wall time: 93.4 ms

# v100x4
%%time
ddf.passenger_count.value_counts().compute()
CPU times: user 28.1 ms, sys: 0 ns, total: 28.1 ms
Wall time: 93.1 ms

This is a huge speedup that’s consistent (in miniature) with the log-scale speedups Capital One Labs saw in their benchmarks. Reducing the runtime for an important ETL pipeline task from 60 minutes to 6 minutes is a huge boon for developer productivity, and can enable you to build more complex and useful processing pipelines that would have been computationally intractable on a CPU. All good things when you’re in the business of turning data into insights!

That’s all for now. Stay tuned for a future blog post exploring how dask_ml and cuml enable you to go one step further by distributing your model training and model scoring, too.

Ready to Get Started?

Create an account in minutes or connect with our team to learn how Spell can accelerate your business.