Orchestrating Spell model pipelines using Prefect

Keeping complex data pipelines in a healthy state is one of the core competencies of MLOps teams. One of the most popular tool types for managing this concern in production today is what is loosely called workflow management tools.

Workflow management tools allow you to break a data pipeline into a directed acyclic graph (or DAG) of subtasks, then schedule that DAG for execution on your backend of choice whenever at whatever cadence you need it at.

The most popular workflow management tool in production use today is Apache Airflow. However, in recent years there have been a handful of new and exciting competing open source workflow management tools.

One of the most prominent of these tools, and the subject of this article, is Prefect. Prefect has generated a lot of mindshare among MLOps practitioners in the past year due to its Pythonic design and ergonomic API. Because Prefect is so much newer than Airflow, it has successfully addressed a number of pain points the older tool suffers from, while directly integrating with ecosystem tools (most prominently, Dask, which can be used as its task backend) that more developers want.

This blog post is an opinionated introduction to Prefect. We will take a look at some of the features in Prefect and take a look at some of the things developers have been saying about it. We will then learn how you can interact with Spell from within Prefect.

Prefect basics

The Prefect tutorial includes the following diagram, which showcases what a simple DAG looks like in practice:

A complete DAG in Prefect is called a flow. Each individual step within a flow is called a task. Tasks have dependencies on predecessor tasks and/or (flow-level) runtime parameters passed to them by the flow creator.

In this example flow there are two parameters (airport and radius). When executing this flow, Prefect begins by executing the fetch reference data task. It then simultaneously schedules (and executes) a store reference data task and a fetch live aircraft data task, the latter of which takes two parameters, airport, and radius. The live data is consumed by a validate and clean data task, then stored by a store live aircraft data task. Then, assuming all of these tasks succeed, the flow execution is given the status success.

Let's see what this looks like in code. Here's a trivial example, again taken from the Prefect docs, of a single-task Prefect flow:

@task
def add(x, y=1):
    return x + y

from prefect import Flow

with Flow("My first flow!") as flow:
    first_result = add(1, y=2)
    second_result = add(x=first_result, y=100)

state = flow.run()

assert state.is_successful()

Tasks are simple Python functions optionally taking some inputs and returning some outputs (there is also an idempotent class-based API, but the functional API is considered the starting point). A decorator, @task, marks a function as a task.

A context manager, Flow, establishes that any tasks within its body are to be added to it. Prefect builds out its computational graph for the flow by looking at the functions that are executed within the flow context. Prefect creates a dependency between two tasks whenever the output of one task instance is consumed by another. In this example, add(x=first_result, y=100) has a dependency on add(1, y=2).

Upon exiting the context manager, flow is a complete flow object that is schedule-able and run-able. run is essentially a developer convenience for executing a flow on your local machine; in production, you'll want to configure it to execute on a production backend instead. However, run is extremely useful for testing the run locally and/or in your dev or CI environment.

Here's the more complete "aircraft data" example:

import aircraftlib as aclib
from prefect import task, Flow, Parameter


@task
def extract_reference_data():
    print("fetching reference data...")
    return aclib.fetch_reference_data()


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def extract_live_data(airport, radius, ref_data):
    # Get the live aircraft vector data around the given airport (or none)
    area = None
    if airport:
        airport_data = ref_data.airports[airport]
        airport_position = aclib.Position(
            lat=float(airport_data["latitude"]),
            long=float(airport_data["longitude"])
        )
        area = aclib.bounding_box(airport_position, radius)

    print("fetching live aircraft data...")
    raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area)

    return raw_aircraft_data


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def transform(raw_aircraft_data, ref_data):
    print("cleaning & transform aircraft data...")

    live_aircraft_data = []
    for raw_vector in raw_aircraft_data:
        vector = aclib.clean_vector(raw_vector)
        if vector:
            aclib.add_airline_info(vector, ref_data.airlines)
            live_aircraft_data.append(vector)

    return live_aircraft_data


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def load_reference_data(ref_data):
    print("saving reference data...")
    db = aclib.Database()
    db.update_reference_data(ref_data)


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def load_live_data(live_aircraft_data):
    print("saving live aircraft data...")
    db = aclib.Database()
    db.add_live_aircraft_data(live_aircraft_data)


def main():
    with Flow("etl") as flow:
        airport = Parameter("airport", default="IAD")
        radius = Parameter("radius", default=200)

        reference_data = extract_reference_data()
        live_data = extract_live_data(airport, radius, reference_data)

        transformed_live_data = transform(live_data, reference_data)

        load_reference_data(reference_data)
        load_live_data(transformed_live_data)

    flow.run(airport="DCA", radius=10)


if __name__ == "__main__":
    main()

This example introduces only a couple of new concepts. One is retries—here, tasks are configured with max_retries and a retry_delay. The other is parameters, demarcated by their use of the Parameter wrapper. Recall that these are runtime values, set at flow scheduling time, which are passed in at runtime (flow scheduling time).

We're really skimming over the API here just to get a basic feel for it. Prefect has tons of other features, including built-in Dask scheduler support, which are out of scope for this short guide. To learn more, refer to the Prefect docs.

Prefect—what people are saying

Is Prefect the right fit for you and your team?

While nothing substitutes for real-world experience or trying it out yourself, it's helpful to look at what other developers have been saying. There has been a lot of chatter around Prefect on the mlops.community Slack. Here are some of the things MLOps engineers that have experimented with it have said there, in no particular order:

Airflow is now becoming a standard even with it’s rough edges. Prefect is better programmed and designed (having learned from Airflow’s rough edges). [...] Prefect might gain adoption but Airflow is not going away soon.

As someone who set it [Airflow] up for the first time at my company, it was a bit of a pain. [...] I think Airflow is the safe choice today, but if you’re a pure Python shop and willing to spend a little money, Prefect could be an interesting bet.

Prefect is built on top of Dask, and they share some core contributors, so we were confident in Prefect from the start. We knew these Dask foundations would lead to a stable core and a strong community – neither of which we found with Kubeflow.

Finally, we were attracted to Prefect because it’s familiar to Python engineers. It addresses many of the pain points common to more complicated tools like Airflow. Specifically, Prefect lets you turn any Python function into a task using a simple Python decorator.

We use Prefect on our team and like it. Can't really compare with the others but for the most part positive experience with Prefect. They also have a very active community and on their Slack so anytime I've run into an issue have gotten support (for free) on their Slack within a day.

If you have no other reason I would recommend prefect.io or ray to coordinate as airflow is famous for being heavyweight.

Whether Prefect, Apache Airflow, or some other tool (Dagster is another notable tool in this space) is the best fit for your use case is something only you can decide. However, I highly recommend joining the MLOps.community Slack, as there are tons of smart people there evaluating and sharing advice on technical decisions just like this one.

Ultimately it sounds like Apache Airflow is still the best choice for larger teams or projects. However, Prefect is a better choice for teams who want to be on the cutting edge, as well as those making heavy use of Dask (for anyone new to Dask, we've written about it before on this blog—see e.g. "Getting started with large-scale ETL jobs using Dask and AWS EMR").

Accessing Spell from a Prefect flow

Because Spell is a cloud machine orchestration API, it's a natural target for Prefect flows. You can use Prefect to orchestrate Spell model training runs, kick-off and download the results of model hyperparameter searches, automate the deployment and testing of Spell model servers, etcetera etcetera.

Here's an example Prefect flow that creates a Spell run, waits for it to complete, and then exits out:

# example_flow.py
import spell.client

from prefect import task, Task, Flow, Parameter
from prefect.utilities.tasks import defaults_from_attrs
from prefect.tasks.secrets import EnvVarSecret


class CreateSpellRun(Task):
    def __init__(self, owner, **kwargs):
        self.owner = owner
        super().__init__(**kwargs)

    @defaults_from_attrs("owner")
    def run(self, command, token, owner=None, **kwargs):
        # Build the client object
        client = spell.client.SpellClient(token=token, owner=owner)

        # Start the run and wait for it to finish
        run = client.runs.new(command=command, **kwargs)
        run.wait_status(*client.runs.FINAL)
        run.refresh()

        # If the run finished with a failed status raise a ValueError
        # so that Prefect knows to mark this task as failed.
        if run.status in [
            client.runs.FAILED,
            client.runs.BUILD_FAILED,
            client.runs.MOUNT_FAILED,
        ]:
            raise ValueError(
                f"Run #{run.id} failed with status `{run.status}`."
        )
	if run.user_exit_code != 0:
            raise ValueError(
                f"Run finished with nonzero exit code "
                f"{run.user_exit_code}."
            )

        return run


def main():
    create_run_in_org = CreateSpellRun(owner="spell-org")

    with Flow("example-flow") as f:
        token = EnvVarSecret("SPELL_TOKEN")
        state = create_run_in_org(command="echo Hello", token=token)

    state = f.run()
    assert state.is_successful()


if __name__ == "__main__":
    main()

This script combines features in Prefect with features in Spell. Let's step through this code to understand how it works.

This code uses a Spell access token to authenticate to the Spell API. Authenticating against the Spell API using an access token requires a token and an owner (an owner is necessary because an individual Spell user may be a member of multiple organizations, and Spell needs to know which one of those organizations you are performing the API action in). These values are passed as input to a spell.client.SpellClient object initializer:

client = spell.client.SpellClient(token=token, owner=owner)
# ...perform some actions with the client...

The token is sensitive security material and should be kept secret. The owner is not. As a result, we initialize these two values and pass them into the flow in different ways.

The token is initialized in the flow declaration using a EnvVarSecret("SPELL_TOKEN"):

with Flow("example-flow") as f:
    token = EnvVarSecret("SPELL_TOKEN")
    state = create_run_in_org(command="echo Hello", token=token)

EnvVarSecret is an example of a Prefect secret—a special task that reads in a secret value from somewhere, allowing you to pass that secret as input to upstream tasks in the flow. In this case, token is instantiated with the value of the SPELL_TOKEN environment variable, then passed into the task create_run_in_org. The token payload will be available only at runtime, limiting its exposure.

The owner is passed in as a task class initialization variable:

class CreateSpellRun(Task):
    def __init__(self, owner, **kwargs):
        self.owner = owner
        super().__init__(**kwargs)

    @defaults_from_attrs("owner")
    def run(self, command, token, owner=None, **kwargs):
        # ...run body...

# ...

create_run_in_org = CreateSpellRun(owner="spell-org")

CreateSpellRun is a custom task class that inherits from prefect.Task. It takes an owner at __init__ time, and that owner is then made available to the run method. The run method is what will get called at create_run_in_org(...parameters) time . Using Prefect's class-based task API here allows us to set a default owner once, in the script body. This is much less verbose than passing an owner into the run function every single time we need it.

All that remains is our task body—in this case, the code that actually executes our run on Spell:

@defaults_from_attrs("owner")
def run(self, command, token, owner=None, **kwargs):
        # Start the run and wait for it to finish
	run = client.runs.new(command=command, **kwargs)
	run.wait_status(*client.runs.FINAL)
	run.refresh()
	
	# If the run finished with a failed status raise a ValueError
	# so that Prefect knows to mark this task as failed.
	if run.status in [
	    client.runs.FAILED,
	    client.runs.BUILD_FAILED,
	    client.runs.MOUNT_FAILED,
	]:
	    raise ValueError(
                f"Run #{run.id} failed with status `{run.status}`."
            )
        if run.user_exit_code != 0:
            raise ValueError(
                f"Run #{run.id} finished with nonzero exit code "
                f"{run.user_exit_code}."
        )

This code uses client.runs.new to initialize a run on Spell with the entrypoint command and whatever other kwargs (e.g. machine_type, github_url, etcetera) we also need. It waits (blocks) on the completion of this run, using run.wait_status(*client.runs.FINAL). Finally, it examines the final state of the run and raises a ValueError if it failed; this will cause Prefect to mark the task (and thus, flow) as failed as well.

Trying out our flow

In a production setting, you would pair a version of this Python file defining the flow with a Prefect Server for scheduling it. However, Prefect makes it trivially easy to test flows on your local machine, no scheduler process needed.

Running pip install prefect, then python example_flow.py:

$ python example_flow.py
[2021-09-20 17:14:42-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'example-flow'
[2021-09-20 17:14:42-0400] INFO - prefect.TaskRunner | Task 'SPELL_TOKEN': Starting task run...
[2021-09-20 17:14:42-0400] INFO - prefect.TaskRunner | Task 'SPELL_TOKEN': Finished task run for task with final state: 'Success'
[2021-09-20 17:14:42-0400] INFO - prefect.TaskRunner | Task 'CreateSpellRun': Starting task run...
[2021-09-20 17:14:49-0400] INFO - prefect.TaskRunner | Task 'CreateSpellRun': Finished task run for task with final state: 'Success'
[2021-09-20 17:14:49-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

Visiting spell-org in the Spell web console will show us the successful run:

From here you can modify the example flow however you'd like to fit your regularly scheduled training pipeline and/or model management needs.

You can check this code and some more advanced examples out from our spellml/examples repo here.

Happy training!

Ready to Get Started?

Create an account in minutes or connect with our team to learn how Spell can accelerate your business.