Dagster enables you to build testable and maintainable data applications. It provides ways to allow you unit-test your data applications, separate business logic from environments, and set explicit expectations on uncontrollable inputs.
In data applications, testing computations and jobs is notoriously challenging. Because of this, they often go relatively untested before hitting production. If there is testing in place, these tests are often slow, not run during common developer workflows, and have limited value because of the inability to simulate conditions in the production environment.
We believe the underlying fact is that data applications encode much of their business logic in heavy, external systems. Examples include processing systems like Spark and data warehouses such as Snowflake and Redshift. It is difficult to structure software to isolate these dependencies or nearly impossible to run them in a lightweight manner.
This page demonstrates how Dagster addresses these challenges:
Principal: Errors that can be caught by unit tests should be caught by unit tests.
Corollary: Do not attempt to unit test for errors that unit tests cannot catch.
Using unit tests without keeping these principles in mind is why the data community frequently treats unit tests with skepticism. It is too often interpreted as simulating an external system such as Spark or data warehouse in a granular manner. Those are very complex systems that are impossible to emulate faithfully. Do not try to do so.
Unit tests are not acceptance tests. They should not be the judge of whether a computation is correct. However, unit testing -- when properly scoped -- is still valuable in data applications. There are massive classes of errors that we can address without interacting with external services and catch earlier in the process: refactoring errors, syntax errors in interpreted languages, configuration errors, graph structure errors, and so on. Errors caught in a fast feedback loop of unit testing can be addressed orders of magnitude faster than those caught during an expensive batch computation in staging or production.
So, unit tests should be viewed primarily as productivity and code quality tools, leading to more correct calculations. Here we will demonstrate how Dagster conveniently enables unit tests.
The workhorse function for unit-testing a job is the JobDefinition.execute_in_process function. Using this function you can execute a job in process and then test execution properties using the ExecuteInProcessResult object that it returns.
deftest_job():
result = do_math_job.execute_in_process()# return type is ExecuteInProcessResultassertisinstance(result, ExecuteInProcessResult)assert result.success
# inspect individual op resultassert result.output_for_node("add_one")==2assert result.output_for_node("add_two")==3assert result.output_for_node("subtract")==-1
You can find more unit test examples in the Examples section below.
As noted above, data applications often rely on and encode their business logic in code that is executed by heavy, external dependencies. It means that it is easy and natural to couple your application to a single operating environment. However, then, if you do this, any testing requires your production environment.
To make local testing possible, you may structure your software to, as much as possible, cleanly separate this business logic from your operating environment. This is one of the reasons why Dagster flows through a context object throughout its entire computation.
Attached to the context is a set of user-defined resources. Examples of resources include APIs to data warehouses, Spark clusters, s3 sessions, or some other external dependency or service. Each job contains a set of resources, and multiple jobs can be defined for a given Dagster graph for each set of resources (production, local, testing, etc).
For example, to skip external dependencies in tests, you may find yourself needing to constantly comment and uncomment like:
from dagster import op
@opdefget_data_without_resource(context):
dummy_data =[1,2,3]# Do not call external apis in tests# return call_api()return dummy_data
Dagster allows you to define multiple "jobs" from the same computation graph. With resources, you can modify the op above to:
from dagster import op, graph
@op(required_resource_keys={"api"})defget_data(context):return context.resources.api.call()@opdefdo_something(context, data):
output = process(data)return output
@graphdefdownload():
do_something(get_data())# The prod job for the download graph.
download_job = download.to_job(resource_defs={"api": api_client})
In this example, we define the business logic (i.e., jobs and ops) within a computation graph, independent of any particular environment. From this computation graph, we define a production job using the resources that define our production environment.
This is extremely helpful when it comes to testing. We can execute the computation graph with mocked versions of resources, since the computation graph is not tied to any particular enviroment. In order to mock the api resource, we use a helper method mock_resource from the ResourceDefinition class.
deftest_local():# Since we have access to the computation graph independent of the set of resources, we can# test it locally.
result = download.execute_in_process(
resources={"api": ResourceDefinition.mock_resource()})assert result.success
defrun_in_prod():
download_job.execute_in_process()
For more information, check out the Resources sections.
We construct the context using build_op_context. Note how we can directly provide a resource instance, instead of having to create a mock resource definition.
from dagster import build_op_context
deftest_op_with_context():
context = build_op_context(resources={"foo":"bar"})assert op_requires_foo(context)=="found bar"
We can also provide a resource definition to build_op_context. It does not have a resource_config argument, so any config should be supplied to the resource via the ResourceDefinition.configured API.
Similar to ops, @asset-decorated functions can be directly invoked. Doing so invokes the underlying op computation.
A basic asset, with no dependencies:
from dagster import asset
@assetdefbasic_asset():return5# An example unit test for basic_asset.deftest_basic_asset():assert basic_asset()==5
An asset with dependencies:
from dagster import asset
@assetdefasset_with_inputs(x, y):return x + y
# An example unit test for asset_with_inputs.deftest_asset_with_inputs():assert asset_with_inputs(5,6)==11
Any resources defined on your asset - either directly, or using with_resources - will be picked up when invoking the asset. When using resources, it is required to provide a context using build_op_context.
from dagster import asset, resource, build_op_context, with_resources
@asset(required_resource_keys={"service"})defasset_reqs_service(context):
service = context.resources.service
...@resourcedefservice():...# asset_with_service now has resource service specified.
asset_with_service = with_resources([asset_reqs_service],{"service": service})[0]deftest_asset_with_service():# When invoking asset_with_service, service resource will# automatically be used.
result = asset_with_service(build_op_context())...
Note that resources provided on build_op_context should not conflict with the resources provided directly on the asset. If a resource key is provided both to the context and the asset, it will result in a DagsterInvalidInvocationError.
You may want to test multiple assets together, to more closely mirror actual materialization. This can be done using the materialize_to_memory method, which loads the materialized results of assets into memory:
from dagster import asset, materialize_to_memory
@assetdefdata_source():return get_data_from_source()@assetdefstructured_data(data_source):return extract_structured_data(data_source)# An example unit test using materialize_to_memorydeftest_data_assets():
result = materialize_to_memory([data_source, structured_data])assert result.success
# Materialized objects can be accessed in terms of the underlying op
materialized_data = result.output_for_node("structured_data")...
Mock resources can be provided directly using materialize_to_memory:
from dagster import asset, resource, materialize_to_memory
import mock
@asset(required_resource_keys={"service"})defasset_requires_service(context):
service = context.resources.service
...deftest_asset_requires_service():# Mock objects can be provided directly.
result = materialize_to_memory([asset_requires_service], resources={"service": mock.MagicMock()})assert result.success
...
The event stream is the most generic way that an op communicates what happened during its computation. Ops communicate events for starting, input/output type checking, and user-provided events such as expectations, materializations, and outputs.
deftest_event_stream():
job_result = emit_events_job.execute_in_process(
run_config={"ops":{"emit_events_op":{"inputs":{"input_num":1}}}})assert job_result.success
# when one op has multiple outputs, you need to specify output nameassert job_result.output_for_node("emit_events_op", output_name="a_num")==2
events_for_step = job_result.events_for_node("emit_events_op")assert[se.event_type for se in events_for_step]==[
DagsterEventType.STEP_START,
DagsterEventType.STEP_INPUT,
DagsterEventType.STEP_EXPECTATION_RESULT,
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.STEP_OUTPUT,
DagsterEventType.HANDLED_OUTPUT,
DagsterEventType.STEP_SUCCESS,]# ops communicate what they did via the event stream, viewable in tools (e.g. dagit)(
_start,
_input_event,
expectation_event,
materialization_event,
_num_output_event,
_num_handled_output_operation,
_success,)= events_for_step
# apologies for verboseness here! we can do better.
expectation_result = expectation_event.event_specific_data.expectation_result
assertisinstance(expectation_result, ExpectationResult)assert expectation_result.success
assert expectation_result.label =="positive"
materialization = materialization_event.event_specific_data.materialization
assertisinstance(materialization, AssetMaterialization)assert materialization.label =="persisted_string"
To check if your Dagster code loads correctly, you can implement a unit test like the following:
from.hello_world_repository import hello_world_repository
deftest_repository_loads_all_definitions():"""
Asserts that the repository can load all definitions (jobs, assets, schedules, etc)
without errors.
"""
hello_world_repository.load_all_definitions()
This test attempts to load a Dagster repository and its definitions - including jobs, assets, schedules, etc. - and fails if there's an issue. This can be useful for identifying compilation-level errors before deploying your code.