Source code for dagstermill.context

from typing import AbstractSet, Any, Mapping, Optional, cast

from dagster import DagsterRun, JobDefinition, OpDefinition
from dagster import _check as check
from dagster._annotations import public
from dagster._core.definitions.dependency import Node, NodeHandle
from dagster._core.execution.context.compute import AbstractComputeExecutionContext
from dagster._core.execution.context.system import PlanExecutionContext, StepExecutionContext
from dagster._core.log_manager import DagsterLogManager
from dagster._core.system_config.objects import ResolvedRunConfig
from dagster._legacy import PipelineDefinition, PipelineRun, SolidDefinition
from dagster._utils.backcompat import deprecation_warning


[docs]class DagstermillExecutionContext(AbstractComputeExecutionContext): """Dagstermill-specific execution context. Do not initialize directly: use :func:`dagstermill.get_context`. """ def __init__( self, pipeline_context: PlanExecutionContext, pipeline_def: PipelineDefinition, resource_keys_to_init: AbstractSet[str], solid_name: str, solid_handle: NodeHandle, solid_config: Any = None, ): self._pipeline_context = check.inst_param( pipeline_context, "pipeline_context", PlanExecutionContext ) self._pipeline_def = check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) self._resource_keys_to_init = check.set_param( resource_keys_to_init, "resource_keys_to_init", of_type=str ) self.solid_name = check.str_param(solid_name, "solid_name") self.solid_handle = check.inst_param(solid_handle, "solid_handle", NodeHandle) self._solid_config = solid_config def has_tag(self, key: str) -> bool: """Check if a logging tag is defined on the context. Args: key (str): The key to check. Returns: bool """ check.str_param(key, "key") return self._pipeline_context.has_tag(key) def get_tag(self, key: str) -> Optional[str]: """Get a logging tag defined on the context. Args: key (str): The key to get. Returns: str """ check.str_param(key, "key") return self._pipeline_context.get_tag(key) @public # type: ignore @property def run_id(self) -> str: """str: The run_id for the context.""" return self._pipeline_context.run_id @public # type: ignore @property def run_config(self) -> Mapping[str, Any]: """dict: The run_config for the context.""" return self._pipeline_context.run_config @property def resolved_run_config(self) -> ResolvedRunConfig: """:class:`dagster.ResolvedRunConfig`: The resolved_run_config for the context""" return self._pipeline_context.resolved_run_config @public # type: ignore @property def logging_tags(self) -> Mapping[str, str]: """dict: The logging tags for the context.""" return self._pipeline_context.logging_tags @public # type: ignore @property def job_name(self) -> str: return self._pipeline_context.job_name @property def pipeline_name(self) -> str: deprecation_warning( "DagstermillExecutionContext.pipeline_name", "0.17.0", "use the 'job_name' property instead.", ) return self.job_name @public # type: ignore @property def job_def(self) -> JobDefinition: """:class:`dagster.JobDefinition`: The job definition for the context. This will be a dagstermill-specific shim. """ return cast( JobDefinition, check.inst( self._pipeline_def, JobDefinition, "Accessing job_def inside a legacy pipeline. Use pipeline_def instead.", ), ) @property def pipeline_def(self) -> PipelineDefinition: """:class:`dagster.PipelineDefinition`: The pipeline definition for the context. This will be a dagstermill-specific shim. """ deprecation_warning( "DagstermillExecutionContext.pipeline_def", "0.17.0", "use the 'job_def' property instead.", ) return self._pipeline_def @property def resources(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to resources.""" return self._pipeline_context.scoped_resources_builder.build( required_resource_keys=self._resource_keys_to_init, ) @public # type: ignore @property def run(self) -> DagsterRun: """:class:`dagster.DagsterRun`: The job run for the context.""" return cast(DagsterRun, self._pipeline_context.pipeline_run) @property def pipeline_run(self) -> PipelineRun: deprecation_warning( "DagstermillExecutionContext.pipeline_run", "0.17.0", "use the 'run' property instead.", ) return self.run @property def log(self) -> DagsterLogManager: """:class:`dagster.DagsterLogManager`: The log manager for the context. Call, e.g., ``log.info()`` to log messages through the Dagster machinery. """ return self._pipeline_context.log @public # type: ignore @property def op_def(self) -> OpDefinition: """:class:`dagster.OpDefinition`: The op definition for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether an op definition was passed to ``dagstermill.get_context``. """ return cast(OpDefinition, self._pipeline_def.solid_def_named(self.solid_name)) @property def solid_def(self) -> SolidDefinition: """:class:`dagster.SolidDefinition`: The solid definition for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether a solid definition was passed to ``dagstermill.get_context``. """ deprecation_warning( "DagstermillExecutionContext.solid_def", "0.17.0", "use the 'op_def' property instead.", ) return cast(SolidDefinition, self._pipeline_def.solid_def_named(self.solid_name)) @property def solid(self) -> Node: """:class:`dagster.Node`: The solid for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether a solid definition was passed to ``dagstermill.get_context``. """ deprecation_warning( "DagstermillExecutionContext.solid_def", "0.17.0", "use the 'op_def' property instead.", ) return self.pipeline_def.get_solid(self.solid_handle) @public # type: ignore @property def op_config(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to op-specific config.""" if self._solid_config: return self._solid_config solid_config = self.resolved_run_config.solids.get(self.solid_name) return solid_config.config if solid_config else None @property def solid_config(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to solid-specific config.""" deprecation_warning( "DagstermillExecutionContext.solid_config", "0.17.0", "use the 'op_config' property instead.", ) return self.op_config
class DagstermillRuntimeExecutionContext(DagstermillExecutionContext): def __init__( self, pipeline_context: PlanExecutionContext, pipeline_def: PipelineDefinition, resource_keys_to_init: AbstractSet[str], solid_name: str, step_context: StepExecutionContext, solid_handle: NodeHandle, solid_config: Any = None, ): self._step_context = check.inst_param(step_context, "step_context", StepExecutionContext) super().__init__( pipeline_context, pipeline_def, resource_keys_to_init, solid_name, solid_handle, solid_config, )