Transitioning Data Pipelines from Development to Production#

In this guide, we'll walk through how to transition your data pipelines from local development to staging and production deployments.

Let's say we’ve been tasked with fetching the N most recent entries from Hacker News and splitting it into two datasets: one containing all of the data about stories and one containing all of the data about comments. In order to make the pipeline maintainable and testable, we have two additional requirements:

  • We must be able to run our data pipeline in local, staging, and production environments.
  • We need to be confident that data won't be accidentally overwritten (for example because a user forgot to change a configuration value).

Using a few Dagster concepts, we can easily tackle this task! Here’s an overview of the main concepts we’ll be using in this guide:

  • Assets - An asset is a software object that models a data asset. The prototypical example is a table in a database or a file in cloud storage.
  • Resources - A resource is an object that models a connection to a (typically) external service. Resources can be shared between assets, and different implementations of resources can be used depending on the environment. For example, a resource may provide methods to send messages in Slack.
  • I/O managers - An I/O manager is a special kind of resource that handles storing and loading assets. For example, if we wanted to store assets in S3, we could use Dagster’s built-in S3 I/O manager.
  • Run config - Assets and resources sometimes require configuration to set certain values, like the password to a database. Run config allows you to set these values at run time. In this guide, we will also use an API to set some default run configuration.

Using these Dagster concepts we will:

  • Write three assets: the full Hacker News dataset, data about comments, and data about stories.
  • Use Dagster's Snowflake I/O manager to store the datasets in Snowflake.
  • Set up our Dagster repository so that the configuration for the Snowflake I/O manager is automatically supplied based on the environment where the code is running.

Setup#

You can find the code for this example on Github

To follow along with this guide, you can copy the full code example and install a few additional pip libraries:

dagster project from-example --name my-dagster-project --example development_to_production
cd my-dagster-project
pip install -e .

Part one: Local development#

In this section we will:

  • Write our assets
  • Add the assets to our repository
  • Add run configuration for the Snowflake I/O manager
  • Materialize assets in Dagit

Let’s start by writing our three assets. We'll use Pandas DataFrames to interact with the data.

# assets.py
import pandas as pd
import requests

from dagster import asset


@asset(
    config_schema={"N": int},
    io_manager_key="snowflake_io_manager",
)
def items(context) -> pd.DataFrame:
    """Items from the Hacker News API: each is a story or a comment on a story."""
    rows = []
    max_id = requests.get(
        "https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5
    ).json()
    # Hacker News API is 1-indexed, so adjust range by 1
    for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1):
        item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        rows.append(requests.get(item_url, timeout=5).json())

    # ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset
    result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"])
    result.rename(columns={"by": "user_id"}, inplace=True)
    return result


@asset(
    io_manager_key="snowflake_io_manager",
)
def comments(items: pd.DataFrame) -> pd.DataFrame:
    """Comments from the Hacker News API."""
    return items[items["type"] == "comment"]


@asset(
    io_manager_key="snowflake_io_manager",
)
def stories(items: pd.DataFrame) -> pd.DataFrame:
    """Stories from the Hacker News API."""
    return items[items["type"] == "story"]

Now we can add these assets to our repository and materialize them via Dagit as part of our local development workflow. We will use the configured API to add configuration for the snowflake_io_manager.

# repository.py
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from development_to_production.assets import comments, items, stories

from dagster import repository, with_resources

snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])


# Note that storing passwords in configuration is bad practice. It will be resolved later in the guide.
@repository
def repo():
    resource_defs = {
        "snowflake_io_manager": snowflake_io_manager.configured(
            {
                "account": "abc1234.us-east-1",
                "user": "me@company.com",
                # password in config is bad practice
                "password": "my_super_secret_password",
                "database": "LOCAL",
                "schema": "ALICE",
            }
        ),
    }

    return [*with_resources([items, comments, stories], resource_defs=resource_defs)]

Note that we have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it shortly.

This results in an asset graph that looks like this:

alt

We can materialize the assets the Dagit and ensure that the data appears in Snowflake as we expect:

alt

While we define our assets as Pandas DataFrames, the Snowflake I/O manager automatically translates the data to and from Snowflake tables. The Python asset name determines the Snowflake table name. In this case three tables will be created: ITEMS, COMMENTS and STORIES.


Part two: Deployment#

In this section we will:

  • Modify the configuration for the Snowflake I/O manager to handle staging and production environments
  • Discuss different options for managing a staging environment

Now that our assets work locally, we can start the deployment process! We'll first set up our assets for production, and then discuss the options for our staging deployment.

Production#

We want to store the assets in a production Snowflake database, so we need to update the configuration for the snowflake_io_manager. But if we were to simply update the values we set for local development, we would run into an issue: the next time a developer wants to work on these assets, they will need to remember to change the configuration back to the local values. This leaves room for a developer to accidentally overwrite the production asset during local development.

Instead, we can set up the repository to determine the configuration for resources based on the environment:

# repository.py

# Note that storing passwords in configuration is bad practice. It will be resolved soon.
@repository
def repo():
    resource_defs = {
        "local": {
            "snowflake_io_manager": snowflake_io_manager.configured(
                {
                    "account": "abc1234.us-east-1",
                    "user": "me@company.com",
                    # password in config is bad practice
                    "password": "my_super_secret_password",
                    "database": "LOCAL",
                    "schema": "ALICE",
                }
            ),
        },
        "production": {
            "snowflake_io_manager": snowflake_io_manager.configured(
                {
                    "account": "abc1234.us-east-1",
                    "user": "dev@company.com",
                    # password in config is bad practice
                    "password": "company_super_secret_password",
                    "database": "PRODUCTION",
                    "schema": "HACKER_NEWS",
                }
            ),
        },
    }
    deployment_name = os.getenv("DAGSTER_DEPLOYMENT", "local")

    return [
        *with_resources(
            [items, comments, stories], resource_defs=resource_defs[deployment_name]
        )
    ]

Note that we still have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it next.

Now, we can set the environment variable DAGSTER_DEPLOYMENT=production in our deployment and the correct resources will be applied to the assets.

We still have some problems with this setup:

  1. Developers need to remember to change user and password to their credentials and schema to their name when developing locally.
  2. Passwords are being stored in code.

We can easily solve these problems because the Snowflake I/O manager accepts configuration from environment variables using the StringSource configuration type. This allows us to store configuration values as environment variables and point Dagster to those environment variables in the run configuration:

# repository.py


@repository
def repo():
    resource_defs = {
        "local": {
            "snowflake_io_manager": snowflake_io_manager.configured(
                {
                    "account": "abc1234.us-east-1",
                    "user": {"env": "DEV_SNOWFLAKE_USER"},
                    "password": {"env": "DEV_SNOWFLAKE_PASSWORD"},
                    "database": "LOCAL",
                    "schema": {"env": "DEV_SNOWFLAKE_SCHEMA"},
                }
            ),
        },
        "production": {
            "snowflake_io_manager": snowflake_io_manager.configured(
                {
                    "account": "abc1234.us-east-1",
                    "user": "system@company.com",
                    "password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"},
                    "database": "PRODUCTION",
                    "schema": "HACKER_NEWS",
                }
            ),
        },
    }
    deployment_name = os.getenv("DAGSTER_DEPLOYMENT", "local")

    return [
        *with_resources(
            [items, comments, stories], resource_defs=resource_defs[deployment_name]
        )
    ]

Staging#

Depending on your organization’s Dagster setup, there are a couple of options for a staging environment.

  • For Dagster Cloud users, we recommend using Branch Deployments as your staging step. A branch deployment is a new Dagster deployment that is automatically generated for each git branch. Check out our comprehensive guide to branch deployments to learn how to use branch deployments to verify data pipelines before deploying them to production.

  • For a self-hosted staging deployment, we’ve already done most of the necessary work to run our assets in staging! All we need to do is add another entry to the resource_defs dictionary in our repository and set DAGSTER_DEPLOYMENT=staging in our staging deployment.

resource_defs = {
    "local": {...},
    "production": {...},
    "staging": {
        "snowflake_io_manager": snowflake_io_manager.configured(
            {
                "account": "abc1234.us-east-1",
                "user": "system@company.com",
                "password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"},
                "database": "STAGING",
                "schema": "HACKER_NEWS",
            }
        ),
    },
}

Advanced: Unit tests with stubs and mocks#

You may have noticed a missing step in the development workflow presented in this guide — unit tests! While the main purpose of the guide is to help you transition your code from local development to a production deployment, unit testing is still an important part of the development cycle. In this section, we'll explore a pattern you may find useful when writing your own unit tests.

When we write unit tests for the items asset, we could make more precise assertions if we knew exactly what data we'd receive from Hacker News. If we refactor our interactions with the Hacker News API as a resource, we can leverage Dagster's resource system to provide a stub resource in our unit tests.

Before we get into implementation, let's go over some best practices:

When to use resources#

In many cases, interacting with an external service directly in assets or ops is more convenient than refactoring the interactions with the service as a resource. We recommend refactoring code to use resources in the following cases:

  • Multiple assets or ops need to interact with the service in a consistent way
  • Different implementations of a service need to be used in certain scenarios (ie. a staging environment, or unit tests)

When to use stub resources#

Determining when it makes sense to stub a resource for a unit test can be a topic of much debate. There are certainly some resources where it would be too complicated to write and maintain a stub. For example, it would be difficult to mock a database like Snowflake with a lightweight database since the SQL syntax may vary. In general, if a resource is relatively simple, writing a stub can be helpful for unit testing the assets and ops that use the resource.

We'll start by writing the "real" Hacker News API Client:

# resources.py
from typing import Any, Dict, Optional

import requests

from dagster import resource


class HNAPIClient:
    """
    Hacker News client that fetches live data
    """

    def fetch_item_by_id(self, item_id: int) -> Optional[Dict[str, Any]]:
        """Fetches a single item from the Hacker News API by item id."""

        item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        item = requests.get(item_url, timeout=5).json()
        return item

    def fetch_max_item_id(self) -> int:
        return requests.get(
            "https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5
        ).json()


@resource
def hn_api_client():
    return HNAPIClient()

We'll also need to update the items asset to use this resource:

# assets.py


@asset(
    config_schema={"N": int},
    required_resource_keys={"hn_client"},
    io_manager_key="snowflake_io_manager",
)
def items(context) -> pd.DataFrame:
    """Items from the Hacker News API: each is a story or a comment on a story."""
    hn_client = context.resources.hn_client

    max_id = hn_client.fetch_max_item_id()
    rows = []
    # Hacker News API is 1-indexed, so adjust range by 1
    for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1):
        rows.append(hn_client.fetch_item_by_id(item_id))

    result = pd.DataFrame(rows, columns=hn_client.item_field_names).drop_duplicates(
        subset=["id"]
    )
    result.rename(columns={"by": "user_id"}, inplace=True)
    return result

For the sake of brevity, we've omitted the implementation of the propertyitem_field_names in HNAPIClient. You can find the full implementation of this resource in the full code example on GitHub.

We'll also need to add the hn_api_client to the resource_defs in our repository.

resource_defs = {
    "local": {"hn_client": hn_api_client, "snowflake_io_manager": {...}},
    "production": {"hn_client": hn_api_client, "snowflake_io_manager": {...}},
    "staging": {"hn_client": hn_api_client, "snowflake_io_manager": {...}},
}

Now we can write a stubbed version of the Hacker News resource. We want to make sure the stub has implementations for each method HNAPIClient implements.

# resources.py


class StubHNClient:
    """
    Hacker News Client that returns fake data
    """

    def __init__(self):
        self.data = {
            1: {
                "id": 1,
                "type": "comment",
                "title": "the first comment",
                "by": "user1",
            },
            2: {"id": 2, "type": "story", "title": "an awesome story", "by": "user2"},
        }

    def fetch_item_by_id(self, item_id: int) -> Optional[Dict[str, Any]]:
        return self.data.get(item_id)

    def fetch_max_item_id(self) -> int:
        return 2

    @property
    def item_field_names(self):
        return ["id", "type", "title", "by"]


@resource
def stub_hn_client():
    return StubHNClient()

Since the stub Hacker News resource and the real Hacker News resource need to implement the same methods, this would be a great time to write an interface. We’ll skip the implementation in this guide, but you can find it in the full code example.

Now we can use the stub Hacker News resource to test that the items asset transforms the data in the way we expect:

# test_assets.py


def test_items():
    context = build_op_context(
        resources={"hn_client": stub_hn_client},
        op_config={"N": StubHNClient().fetch_max_item_id()},
    )
    hn_dataset = items(context)
    assert isinstance(hn_dataset, pd.DataFrame)

    expected_data = pd.DataFrame(StubHNClient().data.values()).rename(
        columns={"by": "user_id"}
    )

    assert (hn_dataset == expected_data).all().all()

Conclusion#

This guide demonstrates how we recommend writing your assets and jobs so that they transition from local development to staging and production environments without requiring code changes at each step. While we focused on assets in this guide, the same concepts and APIs can be used to swap out run configuration for jobs.