Fivetran connectors and Dagster software-defined assets#
A Fivetran connector defines a series of schemas and tables which are synced between a source and a destination. During a sync, data is retrieved from the source and written to the destination as one or more tables. Dagster represents each of the replicas generated in the destination as a software-defined asset. This enables you to easily:
Visualize the schemas and tables involved in a Fivetran connector and execute a sync from Dagster
Define downstream computations which depend on replicas produced by Fivetran
Track historical metadata and logs for each table
Track data lineage through Fivetran and other tools
The first step in using Fivetran with Dagster is to tell Dagster how to connect to your Fivetran instance using a Fivetran resource. This resource contains the credentials needed to access your Fivetran instance.
We will supply our credentials as environment variables. For more information on setting environment variables in a production setting, see Using environment variables and secrets.
Then, we can instruct Dagster to authorize the Fivetran resource using the environment variables:
from dagster_fivetran import fivetran_resource
# Pull API key and secret from environment variables
fivetran_instance = fivetran_resource.configured({"api_key":{"env":"FIVETRAN_API_KEY"},"api_secret":{"env":"FIVETRAN_API_SECRET"},})
For more information on the additional configuration options available for the Fivetran resource, see the API reference.
Step 2: Loading Fivetran asset definitions into Dagster#
The easiest way to get started using Fivetran with Dagster is to have Dagster automatically generate asset defintions from your Fivetran project. You can load asset definitions from a Fivetran instance via API, at initialization time.
Loading Fivetran asset definitions from a Fivetran instance#
To load Fivetran assets into Dagster from your Fivetran instance, you will need to supply the Fivetran resource that we defined above in step 1. Here, the Fivetran instance is treated as the source of truth.
from dagster_fivetran import load_assets_from_fivetran_instance
# Use the fivetran_instance resource we defined in Step 1
fivetran_assets = load_assets_from_fivetran_instance(fivetran_instance)
The load_assets_from_fivetran_instance function retrieves all of the connectors you have defined in the Fivetran interface, creating software-defined assets for each generated table. Each connector has an associated op which triggers a sync of that connector.
Instead of having Dagster automatically create the asset defintions for your Fivetran instance, you can opt to individually build them. First, determine the Connector IDs for each of the connectors you would like to build assets for. The Connector ID can be found on the Setup page for each connector in the Fivetran UI:
Then, supply the Connector ID and the list of tables which the connector creates in the destination to build_fivetran_assets, each in the format {schema_name}.{table_name}:
from dagster_fivetran import build_fivetran_assets
fivetran_assets = build_fivetran_assets(
connector_id="omit_constitutional",
destination_tables=["public.survey_responses","public.surveys"],)
Manually-built Fivetran assets require a fivetran_resource, which defines how to connect and interact with your Fivetran instance.
We can add the Fivetran resource we configured above to our Fivetran assets by doing the following:
from dagster_fivetran import build_fivetran_assets
from dagster import with_resources
fivetran_assets = with_resources(
build_fivetran_assets(
connector_id="omit_constitutional",
destination_tables=["public.survey_responses","public.surveys"],),# Use the fivetran_instance resource we defined in Step 1{"fivetran": fivetran_instance},)
Once you have Fivetran assets, you can define a job that runs some or all of these assets on a schedule, triggering the underlying Fivetran sync.
from dagster import ScheduleDefinition, define_asset_job, repository, AssetSelection
# materialize all assets in the repository
run_everything_job = define_asset_job("run_everything", selection="*")# only run my_fivetran_connection and downstream assets
my_etl_job = define_asset_job("my_etl_job", AssetSelection.groups("my_fivetran_connection").downstream())@repositorydefmy_repo():return[
fivetran_assets,
ScheduleDefinition(
job=my_etl_job,
cron_schedule="@daily",),
ScheduleDefinition(
job=run_everything_job,
cron_schedule="@weekly",),]