Using Dagster with Great Expectations#

You can find the code for this example on Github

This example demonstrates how to use the GE op factory dagster-ge to test incoming data against a set of expectations built through Great Expectations' tooling.

For this example, we'll be using two versions of a dataset of baseball team payroll and wins, with one version modified to hold incorrect data.

You can use ge_validation_op_factory to generate Dagster ops that integrate with Great Expectations. For example, here we show a basic call to this GE op factory, with two required arguments: datasource_name and expectation suite_name.

payroll_expectations = ge_validation_op_factory(
    name="ge_validation_op", datasource_name="getest", suite_name="basic.warning"
)

The GE validations will happen inside the ops created above. Each of the ops will yield an ExpectationResult with a structured dict of metadata from the GE suite. The structured metadata contain both summary stats from the suite and expectation by expectation results. The op will output the full result in case you want to process it differently. Here's how other ops could use the full result, where expectation is the result:

@op
def postprocess_payroll(numrows, expectation):
    if expectation["success"]:
        return numrows
    else:
        raise ValueError

You can configure the GE Data Context via the ge_data_context resource from dagster-ge integration package. All we need to do to expose GE to Dagster is to provide the root of the GE directory (the path to the great_expectations file on your machine).

Finally, here's the full job definition using the GE op, with a default run configuration to use the correct set of data:

@job(
    resource_defs={"ge_data_context": ge_data_context},
    config={
        "resources": {
            "ge_data_context": {
                "config": {"ge_root_dir": file_relative_path(__file__, "./great_expectations")}
            }
        },
        "ops": {
            "read_in_datafile": {
                "inputs": {
                    "csv_path": {"value": file_relative_path(__file__, "./data/succeed.csv")}
                }
            }
        },
    },
)
def payroll_data():
    output_df = read_in_datafile()

    postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))

We can see that we can easily swap the path for succeed.csv with fail.csv to exercise our job with incorrect data.