Getting oriented in the RAPIDS distributed ML ecosystem, part 2: training and scoring

RAPIDS is an ongoing effort, spearheaded by NVIDIA, to accelerate analytics and machine learning (ML) workflows on GPUs. The most exciting thing about RAPIDS are the speedups that GPU compute bring to common ML workflows.s a brief tour through the RAPIDS blog shows, speedups on the order of 10x or 100x are commonplace. However, taking advantage of these requires learning the new tools (like Dask and RAPIDS cuDF) that are core to this ecosystem.

In my previous article, Getting oriented in the RAPIDS distributed ML ecosystem, part 1: ETL, I provided a high-level overview of ETL workflows on RAPIDS — how they work, what they look like. In this follow-up article I will do the same for ML model training and scoring. We will take a look at the APIs, learn how they work, and build out some performance benchmarks showing just how good of a fit RAPIDS can be for your use case.

This blog post assumes familiarity with the Dask and RAPIDS cuDF projects. For background, I recommend reading or skimming my part 1 article!

Distributed Model Training

Large-scale ML model training jobs can be classified into either of two buckets based on where they have a bottleneck. Some training jobs are memory-bound: they need to be trained on a dataset too large to fit into RAM memory on the machine.

In other scenarios, the model does fit into memory, and it's the computational power of the machine that's the bottleneck - in this case, we say that the ML job is compute-bound.

You deal with a model training bottleneck by scaling your compute. When scaling model training, which form of bottleneck you are facing matters. The model training algorithm must be implemented in such a way that it can be partitioned into discrete parts somehow. The methodology for doing so varies from technique to technique. Some machine learning techniques are much easier to scale than others!

To understand the implication this has for you, the end-user, let's break this scaling problem into cases. Here's a visual summary of what we'll cover:

For CPU workloads using scikit-learn that are compute-bound

scikit-learn uses a library called joblib for job parallelization internally.

Dask integrates with joblib, giving joblib access to all of the CPU cores in the Dask cluster. Since scikit-learn is tightly coupled with joblib, this means that every learner in the library can be parallelized the same way — by using the joblib.parallel_backend("dask") context manager.

Here’s a simple code sample, taken from the Dask documentation, of parallelized support vector classifier grid search:

# train.py
import numpy as np
from dask.distributed import Client

import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

client = Client(processes=False)
digits = load_digits()

param_space = {
   'C': np.logspace(-6, 6, 13),
   'gamma': np.logspace(-8, 8, 17),
   'tol': np.logspace(-4, -1, 4),
   'class_weight': [None, 'balanced'],
}

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)

with joblib.parallel_backend('dask'):
   search.fit(digits.data, digits.target)

Note that not all scikit-learn learners support joblib parallelization! Some learners, like DecisionTreeClassifier and RandomForestClassifier, are embarrassingly parallel and partition across cores by default. Some others, like LogisticRegression, support an n_jobs parameter that you can set to enable parallel training. You will need to check whether or not your estimator of choice supports parallel training on a case-by-case basis.

For CPU workloads using scikit-learn that are memory-bound

In this scenario, the algorithm you use must support online learning (also sometimes called incremental learning). In scikit-learn, learners that have this property implement the partial_fit API. Not all algorithms support this — in fact, most don’t (check the documentation for a complete list).

For algorithms which do implement partial_fit, the dask_ml package provides a dask_ml.wrappers.Incremental function. Dask will parallelize your model training job for you automatically. Again, here’s a code sample taken from the docs showcasing this in action:

from dask_ml.wrappers import Incremental
from dask_ml.datasets import make_classification
import sklearn.linear_model

X, y = make_classification(chunks=25)
est = sklearn.linear_model.SGDClassifier()
clf = Incremental(est, scoring='accuracy')
clf.fit(X, y, classes=[0, 1])

In addition to Incremental, there are also a few scikit-learn algorithms that the Dask team have re-implemented themselves, in a memory partition friendly way, in the dask_ml package: KMeans, SpectralClustering, LinearRegression, LogisticRegression, and PoissonRegression.

For GPU workloads using scikit-learn

Sadly, scikit-learn only works on CPU. Luckily the RAPIDS project has already ported many if its most popular algorithms to GPU for you!

The RAPIDS cuml package contains implementations for many common scikit-learn estimators. These are divided into two sets: one in the cuml.* namespace (e.g. cuml.LinearRegression), and one in the cuml.dask.* namespace (e.g. cuml.dask.linear_model.LinearRegression).

As you might have guessed, the algorithms in the cuml.* namespace are single-GPU. Training can be performed on a single GPU only. Though users can do data preparation with Dask DataFrames split up across many machines as input, all of that data will need to be funneled to that GPU at train time, potentially a very slow operation. Code sample:

from cuml import LinearRegression
from cuml.datasets import make_classification

X, y = make_classification()
model = LinearRegression()
model.fit(X, y)
y_pred = model.predict(X)

The algorithms in the cuml.dask.* namespace are multi-GPU. Building a working machine learning algorithm capable of online learning is just as hard on a GPU as it is on a CPU (if not harder), so multi-GPU algorithms only exist for a relatively limited subset of the scikit-learn API (refer to the API docs for a list). Code sample:

from cuml.dask.linear_model import LinearRegression
from cuml.dask.datasets.classification import make_classification

X, y = make_classification()
model = LinearRegression()
model.fit(X, y)
y_pred = model.predict(X)

For workloads using xgboost

Best-in-class classifier performance on tabular datasets comes from using a gradient boosting tree algorithm. xgboost supports memory-bound training across both CPU and GPU out-of-the-box, using an internal backend, rabit, which can optionally be used on top of Dask via the xgboost.dask module.

As a result, everything that you need to know to train an xgboost model on a CPU Dask cluster is in the following code sample:

import xgboost as xgb
import dask.distributed

cluster = dask.distributed.LocalCluster(
    n_workers=4, threads_per_worker=1
)
client = dask.distributed.Client(cluster)
dtrain = xgb.dask.DaskDMatrix(client, X, y)
output = xgb.dask.train(
    client, {'verbosity': 2, 'tree_method': 'hist'},
    dtrain, num_boost_round=4, evals=[(dtrain, 'train')]
)

To train an xgboost model on a GPU cluster, swap from a LocalCluster to a LocalCUDACluster. Then, you need only change a single LOC — update the tree_method parameter to gpu_hist:

client = dask.distributed.Client(cluster)
dtrain = xgb.dask.DaskDMatrix(client, X, y)
output = xgb.dask.train(
    client, {'verbosity': 2, 'tree_method': 'gpu_hist'},
    dtrain, num_boost_round=4, evals=[(dtrain, 'train')]
)

Distributed Model Scoring

Model scoring (also known as batch processing) is the task of generating model predictions on a dataset of interest. If the dataset being scored is very large, you run into the same scaling problems (memory and/or compute bottlenecks) as in training.

The difference is that model scoring, unlike model training, is embarrassingly parallel. It is trivially easy to stand up multiple copies of a model and have them predict on different slices of the test dataset simultaneously. This makes things much easier.

Let’s once again break this task down into specific cases.

For workloads using scikit-learn

dask_ml provides a ParallelPostFit wrapper that, when applied to a scikit-learn estimator, splits up calls to the predict or predict_proba methods across the cluster. Continuing our earlier example:

# from dask_ml.wrappers import Incremental
# from dask_ml.datasets import make_classification
# import sklearn.linear_model
#
# X, y = make_classification(chunks=25)
# clf = sklearn.linear_model.SGDClassifier()
# clf.fit(X, y, classes=[0, 1])

X_test, y_test = make_classification(chunks=25)
from dask_ml.wrappers import ParallelPostFit
clf = ParallelPostFit(clf)
y_test_pred = clf.predict(X_test)

For GPU workloads

One really cool feature of ParallelPostFit is that it will accept any class that implements predict and/or predict_proba as input. cuml algorithms implement these methods, so you can pass them to Dask without any modifications!

Again, extending our example from earlier:

# from cuml.dask.linear_model import LinearRegression
# import dask_cudf
#
# X, y = make_classification()
# X = cudf.from_pandas(pd.DataFrame(X))
# X = dask_cudf.from_cudf(X, npartitions=4)
# y = dask_cudf.from_cudf(cudf.Series(y).astype('float64'),
#         npartitions=4)
#
# model = LinearRegression()
# model.fit(X, y)
X_test, y_test = make_classification(chunks=25)

from dask_ml.wrappers import ParallelPostFit
model = ParallelPostFit(model)
y_test_pred = model.predict(X_test)

For workloads using xgboost

Calling predict on an xgboost model trained on GPU using gpu_hist will automatically perform inference on GPU as well:

# client = dask.distributed.Client(cluster)
# dtrain = xgb.dask.DaskDMatrix(client, X, y)
# output = xgb.dask.train(
#     client, {'verbosity': 2, 'tree_method': 'gpu_hist'},
#     dtrain, num_boost_round=4, evals=[(dtrain, 'train')]
# )
prediction = xgb.dask.predict(client, output, dtrain)

A Quick Benchmark

Now that we have a good sense of what the API surface for distributed model training and scoring look like, let’s look at a simple benchmark to get a feel for the potential time save it could provide.

I used Spell to train two linear regression models on 10 GB of fake data.

For the first (CPU-based) model I trained a dask_ml.linear_regression.LinearRegression model on a dataset created using dask_ml.datasets.make_regression. I used one of the biggest CPU machines available on AWS, a c5.18xlarge instance (72 vCPUs, 144 GB RAM) for training.

For the second (GPU-based) model I trained a cuml.dask.linear_regression.LinearRegression model on a cuml.dask.datasets.regression.make_regression dataset. The training was performed on an entry-level (for GPUs at least) g4dn.xlarge instance (1 T4 GPU, 12 GB VRAM).

The CPU training job finished in 89 seconds. The GPU training job, by contrast, took only 5 seconds. That’s a speedup of 18x!

You can check out the code for yourself on GitHub.

Conclusion

Model training and model scoring are arguably the two most fundamental tasks in all of machine learning. Bringing these tasks onto GPU using Dask and RAPIDS has the potential to accelerate workflows using classical machine learning techniques by a factor of 20x or more.

Of course, it’s important to note that these tools are still in the early stages of development, so many scikit-learn algorithms and algorithms features do not yet have distributed or distributed GPU equivalents. Nevertheless, APIs for doing these things exist and can be used today. If you’re looking to scale your ML jobs to large datasets, check them out!

Ready to Get Started?

Create an account in minutes or connect with our team to learn how Spell can accelerate your business.