The dagster_pandas library provides the ability to perform data validation, emit summary statistics, and enable reliable dataframe serialization/deserialization. On top of this, the Dagster type system generates documentation of your dataframe constraints and makes it accessible in the Dagit UI.
To create a custom dagster_pandas type, use create_dagster_pandas_dataframe_type and provide a list of PandasColumn objects which specify column-level schema and constraints. For example, we can construct a custom dataframe type to represent a set of e-bike trips in the following way:
By passing in these PandasColumn objects, we are expressing the schema and constraints we expect our dataframes to follow when Dagster performs type checks for our ops. Moreover, if we go to the op viewer, we can follow our schema documented in Dagit:
Now that we have a custom dataframe type that performs schema validation during a run, we can express dataframe level constraints (e.g number of rows, or columns).
To do this, we provide a list of dataframe constraints to create_dagster_pandas_dataframe_type; for example, using RowCountConstraint. More information on the available constraints can be found in the dagster_pandas API docs.
If we rerun the above example with this dataframe, nothing should change. However, if we pass in 100 to the row count constraint, we can watch our job fail that type check.
Aside from constraint validation, create_dagster_pandas_dataframe_type also takes in a summary statistics function that emits MetadataEntry objects which are surfaced during runs. Since data systems seldom control the quality of the data they receive, it becomes important to monitor data as it flows through your systems. In complex jobs, this can help debug and monitor data drift over time. Let's illustrate how this works in our example:
Now if we run this job in the Dagit launchpad, we can see that the SummaryStatsTripDataFrame type is displayed in the logs along with the emitted metadata.
PandasColumn is user-pluggable with custom constraints. They can be constructed directly and passed a list of ColumnConstraint objects.
To tie this back to our example, let's say that we want to validate that the amount paid for a e-bike must be in 5 dollar increments because that is the price per mile rounded up. As a result, let's implement a DivisibleByFiveConstraint. To do this, all it needs is a markdown_description for Dagit which accepts and renders markdown syntax, an error_description for error logs, and a validation method which throws a ColumnConstraintViolationException if a row fails validation. This would look like the following:
classDivisibleByFiveConstraint(ColumnConstraint):def__init__(self):
message ="Value must be divisible by 5"super(DivisibleByFiveConstraint, self).__init__(
error_description=message, markdown_description=message
)defvalidate(self, dataframe, column_name):
rows_with_unexpected_buckets = dataframe[
dataframe[column_name].apply(lambda x: x %5!=0)]ifnot rows_with_unexpected_buckets.empty:raise ColumnConstraintViolationException(
constraint_name=self.name,
constraint_description=self.error_description,
column_name=column_name,
offending_rows=rows_with_unexpected_buckets,)
CustomTripDataFrame = create_dagster_pandas_dataframe_type(
name="CustomTripDataFrame",
columns=[
PandasColumn("amount_paid",
constraints=[
ColumnDTypeInSetConstraint({"int64"}),
DivisibleByFiveConstraint(),],)],)