Using Airbyte with Dagster#

Dagster can orchestrate your Airbyte connections, making it easy to chain an Airbyte sync with upstream or downstream steps in your workflow.

This guide focuses on how to work with Airbyte connections using Dagster's software-defined asset (SDA) framework.

Screenshot of the Airbyte UI in a browser, showing the connection ID in the URL.

Airbyte connections and Dagster software-defined assets#

An Airbyte connection defines a series of data streams which are synced between a source and a destination. During a sync, a replica of the data from each data stream is written to the destination, typically as one or more tables. Dagster represents each of the replicas generated in the destination as a software-defined asset. This enables you to easily:

  • Visualize the streams involved in an Airbyte connection and execute a sync from Dagster
  • Define downstream computations which depend on replicas produced by Airbyte
  • Track historical metadata and logs for each data stream
  • Track data lineage through Airbyte and other tools

Prerequisites#

To get started, you will need to install the dagster and dagster-airbyte Python packages:

pip install dagster dagster-airbyte

You'll also want to have an Airbyte instance running. If you don't have one already, you can run Airbyte locally using docker-compose.


Step 1: Connecting to Airbyte#

The first step in using Airbyte with Dagster is to tell Dagster how to connect to your Airbyte instance using an Airbyte resource. This resource contains information on where the Airbyte instance is located and any credentials needed to access it.

from dagster_airbyte import airbyte_resource

airbyte_instance = airbyte_resource.configured(
    {
        "host": "localhost",
        "port": 8000,
        # If using basic auth, include username and password:
        "username": "airbyte",
        "password": {"env": "AIRBYTE_PASSWORD"},
    }
)

If you're running Airbyte locally using docker-compose, the host and port parameters should be set to localhost and 8000, respectively. The default basic auth credentials are a username airbyte and password password.

If you're hosting Airbyte externally, you'll need to provide a hostname where the Airbyte webapp and API are accssible, typically on port 80. For more information on the configuration options available for the Airbyte resource, see the API reference.


Step 2: Loading Airbyte asset definitions into Dagster#

The easiest way to get started using Airbyte with Dagster is to have Dagster automatically generate asset defintions from your Airbyte project. This can be done in one of two ways:

You can also manually-build asset definitions on a per-connection basis.

Loading Airbyte asset definitions from an Airbyte instance#

To load Airbyte assets into Dagster from a live Airbyte instance, you will need to supply the Airbyte resource that we defined above in step 1. Here, the Airbyte instance is treated as the source of truth.

from dagster_airbyte import load_assets_from_airbyte_instance

# Use the airbyte_instance resource we defined in Step 1
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

The load_assets_from_airbyte_instance function retrieves all of the connections you have defined in the Airbyte interface, creating software-defined assets for each data stream. Each connection has an associated op which triggers a sync of that connection.


Step 3: Adding downstream assets#

Looking to orchestrate Airbyte with dbt? Check out our Modern Data Stack example and our dbt integration docs.

Once you have loaded your Airbyte assets into Dagster, you can create assets which depend on them. These can be other assets pulled in from external sources such as dbt or assets defined in Python code.

In this case, we have an Airbyte connection which is storing data in the stargazers table in our Snowflake warehouse. We specify the output IO manager to tell downstream assets how to retreive the data.

import json
from dagster import asset, repository, with_resources
from dagster_airbyte import load_assets_from_airbyte_instance

airbyte_assets = load_assets_from_airbyte_instance(
    airbyte_instance,
    io_manager_key="snowflake_io_manager",
)

@asset
def stargazers_file(stargazers):
    with open("stargazers.json", "w", encoding="utf8") as f:
        f.write(json.dumps(stargazers, indent=2))

@repository
def my_repo():
    return [
        with_resources(
            [airbyte_assets, stargazers_file],
            {"snowflake_io_manager": snowflake_io_manager},
        )
    ]

Step 4: Scheduling Airbyte syncs#

Once you have Airbyte assets, you can define a job that runs some or all of these assets on a schedule, triggering the underlying Airbyte sync.

from dagster import ScheduleDefinition, define_asset_job, repository, AssetSelection

# materialize all assets in the repository
run_everything_job = define_asset_job("run_everything", selection="*")

# only run my_airbyte_connection and downstream assets
my_etl_job = define_asset_job(
    "my_etl_job", AssetSelection.groups("my_airbyte_connection").downstream()
)

@repository
def my_repo():
    return [
        airbyte_assets,
        ScheduleDefinition(
            job=my_etl_job,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@weekly",
        ),
    ]

Refer to the Schedule documentation for more info on running jobs on a schedule.


Conclusion#

If you find a bug or want to add a feature to the dagster-airbyte library, we invite you to contribute.

If you have questions on using Airbyte with Dagster, we'd love to hear from you:

join-us-on-slack