You've added upstream assets to your data pipeline, but nothing downstream - until now. In this step, you'll create a Dagster asset called order_count_chart that's downstream of the customers asset produced by the stg_orders and stg_customers dbt models. The order_count_chart asset computes a plotly chart using the data from its upstream asset dependency.
Here's a quick refresher of what our graph of assets currently looks like:
To add the order_count_chart asset:
In /tutorial_template/tutorial_dbt_dagster/assets/__init__.py, replace the imports section with the following:
import pandas as pd
import plotly.express as px
from dagster_dbt import load_assets_from_dbt_project
from dagster import AssetIn, MetadataValue, asset, file_relative_path
customers is supplied as an argument to ins, defining it as an upstream asset dependency of the order_count_chart asset
We've also used AssetIn to explicitly define an upstream dependency. Just like in part three of this tutorial, we've defined these as jaffle_shop and staging, respectively.
Finally, the chart is saved as order_count_chart.html in /tutorial_template/tutorial_dbt_dagster and automatically opened in the browser upon successful materialization.
At this point, /tutorial_template/tutorial_dbt_dagster/assets/__init__.py should look like this:
import pandas as pd
import plotly.express as px
from dagster_dbt import load_assets_from_dbt_project
from dagster import AssetIn, MetadataValue, asset, file_relative_path
@asset(key_prefix=["jaffle_shop"], group_name="staging")defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"], group_name="staging")deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
DBT_PROJECT_PATH = file_relative_path(__file__,"../../jaffle_shop")
DBT_PROFILES = file_relative_path(__file__,"../../jaffle_shop/config")
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["jaffle_shop"])@asset(
ins={"customers": AssetIn(key_prefix=["jaffle_shop"])},
group_name="staging",)deforder_count_chart(context, customers: pd.DataFrame):
fig = px.histogram(customers, x="number_of_orders")
fig.update_layout(bargap=0.2)
save_chart_path = file_relative_path(__file__,"order_count_chart.html")
fig.write_html(save_chart_path, auto_open=True)
context.add_output_metadata({"plot_url": MetadataValue.url("file://"+ save_chart_path)})
In this step, you'll materialize the order_count_chart_asset. When successfully materialized, a new tab containing the plotly chart will automatically open in your browser.
Back in Dagit on the asset graph, click Reload definitions. This ensures that Dagit picks up the changes you made in the previous steps.
At this point, the order_count_chart asset should display below customers as a downstream dependency:
Click the order_count_chart asset to select it.
Click Materialize selected to kick off a run that materializes the asset.
That's it! When the run successfully completes, the following chart will automatically open in your browser:
That's the end of this tutorial - congratulations! By now, you should have a working dbt and Dagster integration and a handful of materialized Dagster assets.