A @success_hook or @failure_hook decorated function is called an op hook. Op hooks are designed for generic purposes — it can be anything you would like to do at a per op level.
For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job.
The @job decorator accepts hooks as a parameter. Likewise, when creating a job from a graph, hooks are also accepted as a parameter in the GraphDefinition.to_job function. In the below example, we can pass the slack_message_on_failure hook above in a set as a parameter to @job. Then, slack messages will be sent when any op in the job fails.
@job(resource_defs={"slack": slack_resource}, hooks={slack_message_on_failure})defnotif_all():# the hook "slack_message_on_failure" is applied on every op instance within this graph
a()
b()
When you run this job, you can provide configuration to the slack resource in the run config:
resources:slack:config:token:"xoxp-1234123412341234-12341234-1234"# replace with your slack token
@job(
resource_defs={"slack": slack_resource.configured({"token":"xoxp-1234123412341234-12341234-1234"})},
hooks={slack_message_on_failure},)defnotif_all_configured():# the hook "slack_message_on_failure" is applied on every op instance within this graph
a()
b()
Sometimes a job is a shared responsibility or you only want to be alerted on high-priority op executions. So we also provide a way to set up hooks on op instances which enables you to apply policies on a per-op basis.
@job(resource_defs={"slack": slack_resource})defselective_notif():# only op "a" triggers hooks: a slack message will be sent when it fails or succeeds
a.with_hooks({slack_message_on_failure, slack_message_on_success})()# op "b" won't trigger any hooks
b()
In this case, op "b" won't trigger any hooks, while when op "a" fails or succeeds it will send a slack message.
You can test the functionality of a hook by invoking the hook definition. This will run the underlying decorated function. You can construct a context to provide to the invocation using the build_hook_context function.
In many cases, you might want to know details about an op failure. You can get the exception object thrown in the failed op via the op_exception property on HookContext:
from dagster import HookContext, failure_hook
import traceback
@failure_hookdefmy_failure_hook(context: HookContext):
op_exception: BaseException = context.op_exception
# print stack trace of exception
traceback.print_tb(op_exception.__traceback__)
Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys, the body of the hook can access the corresponding resource via the resources attribute of its context object.
It also enables you to switch resource values in different jobs so that, for example, you can send slack messages only while executing a production job and mock the slack resource while testing.
Because executing a production job and a testing job share the same core of business logic, we can build these jobs from a shared graph. In the GraphDefinition.to_job method, which builds a job from a graph, you can specify environment-specific hooks and resources.
In this case, we can mock the slack_resource using a helper function ResourceDefinition.hardcoded_resource(), so it won't send slack messages during development.
When we switch to production, we can provide the real slack token in the run_config and therefore enable sending messages to a certain slack channel when a hook is triggered.
resources:slack:config:token:"xoxp-1234123412341234-12341234-1234"# replace with your slack token
Then, we can execute a job with the config through Python API, CLI, or the Dagit UI. Here's an example of using the Python API.
When you add a hook to a job, the hook will be added to every op in the job individually. The hook does not track job-scoped events and only tracks op-level success or failure events.
You may find the need to set up job-level policies. For example, you may want to run some code for every job failure.
Dagster provides a way to create a sensor that reacts to job failure events. You can find details at Run failure sensor on the Sensors page.