Source code for dagster._core.definitions.hook_definition

from typing import AbstractSet, Any, Callable, Iterator, NamedTuple, Optional, cast

import dagster._check as check
from dagster._annotations import PublicAttr

from ..decorator_utils import get_function_params
from ..errors import DagsterInvalidInvocationError
from .resource_requirement import HookResourceRequirement, RequiresResources, ResourceRequirement
from .utils import check_valid_name


[docs]class HookDefinition( NamedTuple( "_HookDefinition", [ ("name", PublicAttr[str]), ("hook_fn", PublicAttr[Callable]), ("required_resource_keys", PublicAttr[AbstractSet[str]]), ("decorated_fn", PublicAttr[Optional[Callable]]), ], ), RequiresResources, ): """Define a hook which can be triggered during a op execution (e.g. a callback on the step execution failure event during a op execution). Args: name (str): The name of this hook. hook_fn (Callable): The callback function that will be triggered. required_resource_keys (Optional[AbstractSet[str]]): Keys for the resources required by the hook. """ def __new__( cls, *, name: str, hook_fn: Callable[..., Any], required_resource_keys: Optional[AbstractSet[str]] = None, decorated_fn: Optional[Callable[..., Any]] = None, ): return super(HookDefinition, cls).__new__( cls, name=check_valid_name(name), hook_fn=check.callable_param(hook_fn, "hook_fn"), required_resource_keys=frozenset( check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str) ), decorated_fn=check.opt_callable_param(decorated_fn, "decorated_fn"), ) def __call__(self, *args, **kwargs): """This is invoked when the hook is used as a decorator. We currently support hooks to decorate the following: - PipelineDefinition: when the hook decorates a job definition, it will be added to all the op invocations within the job. Example: .. code-block:: python @success_hook def slack_message_on_success(_): ... @slack_message_on_success @job def a_job(): foo(bar()) """ from ..execution.context.hook import HookContext from .graph_definition import GraphDefinition from .hook_invocation import hook_invocation_result from .pipeline_definition import PipelineDefinition if len(args) > 0 and isinstance(args[0], (PipelineDefinition, GraphDefinition)): # when it decorates a pipeline, we apply this hook to all the solid invocations within # the pipeline. return args[0].with_hooks({self}) else: if not self.decorated_fn: raise DagsterInvalidInvocationError( "Only hook definitions created using one of the hook decorators can be invoked." ) fxn_args = get_function_params(self.decorated_fn) # If decorated fxn has two arguments, then this is an event list hook fxn, and parameter # names are always context and event_list if len(fxn_args) == 2: context_arg_name = fxn_args[0].name event_list_arg_name = fxn_args[1].name if len(args) + len(kwargs) != 2: raise DagsterInvalidInvocationError( "Decorated function expects two parameters, context and event_list, but " f"{len(args) + len(kwargs)} were provided." ) if args: context = check.opt_inst_param(args[0], "context", HookContext) event_list = check.opt_list_param( args[1] if len(args) > 1 else kwargs[event_list_arg_name], event_list_arg_name, ) else: if context_arg_name not in kwargs: raise DagsterInvalidInvocationError( f"Could not find expected argument '{context_arg_name}'. Provided " f"kwargs: {list(kwargs.keys())}" ) if event_list_arg_name not in kwargs: raise DagsterInvalidInvocationError( f"Could not find expected argument '{event_list_arg_name}'. Provided " f"kwargs: {list(kwargs.keys())}" ) context = check.opt_inst_param( kwargs[context_arg_name], context_arg_name, HookContext ) event_list = check.opt_list_param( kwargs[event_list_arg_name], event_list_arg_name ) return hook_invocation_result(self, context, event_list) else: context_arg_name = fxn_args[0].name if len(args) + len(kwargs) != 1: raise DagsterInvalidInvocationError( f"Decorated function expects one parameter, {context_arg_name}, but " f"{len(args) + len(kwargs)} were provided." ) if args: context = check.opt_inst_param(args[0], context_arg_name, HookContext) else: if context_arg_name not in kwargs: raise DagsterInvalidInvocationError( f"Could not find expected argument '{context_arg_name}'. Provided " f"kwargs: {list(kwargs.keys())}" ) context = check.opt_inst_param( kwargs[context_arg_name], context_arg_name, HookContext ) return hook_invocation_result(self, context) def get_resource_requirements( self, outer_context: Optional[object] = None ) -> Iterator[ResourceRequirement]: # outer_context in this case is a string of (pipeline/job, pipeline/job name) or (node, node name) attached_to = cast(Optional[str], outer_context) for resource_key in sorted(list(self.required_resource_keys)): yield HookResourceRequirement( key=resource_key, attached_to=attached_to, hook_name=self.name )