Source code for dagster._core.storage.pipeline_run

import warnings
from datetime import datetime
from enum import Enum
from inspect import Parameter
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    FrozenSet,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Type,
    Union,
)

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.events import AssetKey
from dagster._core.origin import PipelinePythonOrigin
from dagster._core.storage.tags import PARENT_RUN_ID_TAG, ROOT_RUN_ID_TAG
from dagster._core.utils import make_new_run_id
from dagster._serdes.serdes import (
    DefaultNamedTupleSerializer,
    EnumSerializer,
    WhitelistMap,
    register_serdes_enum_fallbacks,
    register_serdes_tuple_fallbacks,
    replace_storage_keys,
    unpack_inner_value,
    whitelist_for_serdes,
)

from .tags import (
    BACKFILL_ID_TAG,
    PARTITION_NAME_TAG,
    PARTITION_SET_TAG,
    REPOSITORY_LABEL_TAG,
    RESUME_RETRY_TAG,
    SCHEDULE_NAME_TAG,
    SENSOR_NAME_TAG,
)

if TYPE_CHECKING:
    from dagster._core.definitions.repository_definition import RepositoryLoadData
    from dagster._core.host_representation.origin import ExternalPipelineOrigin


class DagsterRunStatusSerializer(EnumSerializer):
    @classmethod
    def value_from_storage_str(cls, storage_str: str, klass: Type) -> Enum:
        return getattr(klass, storage_str)

    @classmethod
    def value_to_storage_str(
        cls, value: Enum, whitelist_map: WhitelistMap, descent_path: str
    ) -> str:
        enum_value = value.value
        # Store DagsterRunStatus with backcompat name PipelineRunStatus
        backcompat_name = "PipelineRunStatus"
        return ".".join([backcompat_name, enum_value])


[docs]@whitelist_for_serdes(serializer=DagsterRunStatusSerializer) class DagsterRunStatus(Enum): """The status of run execution.""" # Runs waiting to be launched by the Dagster Daemon. QUEUED = "QUEUED" # Runs that have been launched, but execution has not yet started.""" NOT_STARTED = "NOT_STARTED" # Runs that are managed outside of the Dagster control plane. MANAGED = "MANAGED" # Runs that have been launched, but execution has not yet started. STARTING = "STARTING" # Runs that have been launched and execution has started. STARTED = "STARTED" # Runs that have successfully completed. SUCCESS = "SUCCESS" # Runs that have failed to complete. FAILURE = "FAILURE" # Runs that are in-progress and pending to be canceled. CANCELING = "CANCELING" # Runs that have been canceled before completion. CANCELED = "CANCELED"
PipelineRunStatus = DagsterRunStatus register_serdes_enum_fallbacks({"PipelineRunStatus": DagsterRunStatus}) # These statuses that indicate a run may be using compute resources IN_PROGRESS_RUN_STATUSES = [ PipelineRunStatus.STARTING, PipelineRunStatus.STARTED, PipelineRunStatus.CANCELING, ] # This serves as an explicit list of run statuses that indicate that the run is not using compute # resources. This and the enum above should cover all run statuses. NON_IN_PROGRESS_RUN_STATUSES = [ PipelineRunStatus.QUEUED, PipelineRunStatus.NOT_STARTED, PipelineRunStatus.SUCCESS, PipelineRunStatus.FAILURE, PipelineRunStatus.MANAGED, PipelineRunStatus.CANCELED, ] FINISHED_STATUSES = [ PipelineRunStatus.SUCCESS, PipelineRunStatus.FAILURE, PipelineRunStatus.CANCELED, ] @whitelist_for_serdes class PipelineRunStatsSnapshot( NamedTuple( "_PipelineRunStatsSnapshot", [ ("run_id", str), ("steps_succeeded", int), ("steps_failed", int), ("materializations", int), ("expectations", int), ("enqueued_time", Optional[float]), ("launch_time", Optional[float]), ("start_time", Optional[float]), ("end_time", Optional[float]), ], ) ): def __new__( cls, run_id: str, steps_succeeded: int, steps_failed: int, materializations: int, expectations: int, enqueued_time: Optional[float], launch_time: Optional[float], start_time: Optional[float], end_time: Optional[float], ): return super(PipelineRunStatsSnapshot, cls).__new__( cls, run_id=check.str_param(run_id, "run_id"), steps_succeeded=check.int_param(steps_succeeded, "steps_succeeded"), steps_failed=check.int_param(steps_failed, "steps_failed"), materializations=check.int_param(materializations, "materializations"), expectations=check.int_param(expectations, "expectations"), enqueued_time=check.opt_float_param(enqueued_time, "enqueued_time"), launch_time=check.opt_float_param(launch_time, "launch_time"), start_time=check.opt_float_param(start_time, "start_time"), end_time=check.opt_float_param(end_time, "end_time"), ) class DagsterRunSerializer(DefaultNamedTupleSerializer): @classmethod def value_from_storage_dict( cls, storage_dict, klass, args_for_class, whitelist_map, descent_path, ): # unpack all stored fields unpacked_dict = { key: unpack_inner_value(value, whitelist_map, f"{descent_path}.{key}") for key, value in storage_dict.items() } # called by the serdes layer, delegates to helper method with expanded kwargs return pipeline_run_from_storage(**unpacked_dict) @classmethod def value_to_storage_dict( cls, value: NamedTuple, whitelist_map: WhitelistMap, descent_path: str, ) -> Dict[str, Any]: storage = super().value_to_storage_dict( value, whitelist_map, descent_path, ) # persist using legacy name PipelineRun storage["__class__"] = "PipelineRun" return storage def pipeline_run_from_storage( pipeline_name=None, run_id=None, run_config=None, mode=None, asset_selection=None, solid_selection=None, solids_to_execute=None, step_keys_to_execute=None, status=None, tags=None, root_run_id=None, parent_run_id=None, pipeline_snapshot_id=None, execution_plan_snapshot_id=None, # backcompat environment_dict=None, previous_run_id=None, selector=None, solid_subset=None, reexecution_config=None, # pylint: disable=unused-argument external_pipeline_origin=None, pipeline_code_origin=None, has_repository_load_data=None, **kwargs, ): # serdes log # * removed reexecution_config - serdes logic expected to strip unknown keys so no need to preserve # * added pipeline_snapshot_id # * renamed previous_run_id -> parent_run_id, added root_run_id # * added execution_plan_snapshot_id # * removed selector # * added solid_subset # * renamed solid_subset -> solid_selection, added solids_to_execute # * renamed environment_dict -> run_config # * added asset_selection # * added has_repository_load_data # back compat for environment dict => run_config if environment_dict: check.invariant( not run_config, "Cannot set both run_config and environment_dict. Use run_config parameter.", ) run_config = environment_dict # back compat for previous_run_id => parent_run_id, root_run_id if previous_run_id and not (parent_run_id and root_run_id): parent_run_id = previous_run_id root_run_id = previous_run_id # back compat for selector => pipeline_name, solids_to_execute selector = check.opt_inst_param(selector, "selector", ExecutionSelector) if selector: check.invariant( pipeline_name is None or selector.name == pipeline_name, ( "Conflicting pipeline name {pipeline_name} in arguments to PipelineRun: " "selector was passed with pipeline {selector_pipeline}".format( pipeline_name=pipeline_name, selector_pipeline=selector.name ) ), ) if pipeline_name is None: pipeline_name = selector.name check.invariant( solids_to_execute is None or set(selector.solid_subset) == solids_to_execute, ( "Conflicting solids_to_execute {solids_to_execute} in arguments to PipelineRun: " "selector was passed with subset {selector_subset}".format( solids_to_execute=solids_to_execute, selector_subset=selector.solid_subset ) ), ) # for old runs that only have selector but no solids_to_execute if solids_to_execute is None: solids_to_execute = frozenset(selector.solid_subset) if selector.solid_subset else None # back compat for solid_subset => solids_to_execute check.opt_list_param(solid_subset, "solid_subset", of_type=str) if solid_subset: solids_to_execute = frozenset(solid_subset) # warn about unused arguments if len(kwargs): warnings.warn( "Found unhandled arguments from stored PipelineRun: {args}".format(args=kwargs.keys()) ) return DagsterRun( # pylint: disable=redundant-keyword-arg pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, asset_selection=asset_selection, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot_id=pipeline_snapshot_id, execution_plan_snapshot_id=execution_plan_snapshot_id, external_pipeline_origin=external_pipeline_origin, pipeline_code_origin=pipeline_code_origin, has_repository_load_data=has_repository_load_data, ) class PipelineRun( NamedTuple( "_PipelineRun", [ ("pipeline_name", str), ("run_id", str), ("run_config", Mapping[str, object]), ("mode", Optional[str]), ("asset_selection", Optional[FrozenSet[AssetKey]]), ("solid_selection", Optional[Sequence[str]]), ("solids_to_execute", Optional[FrozenSet[str]]), ("step_keys_to_execute", Optional[Sequence[str]]), ("status", PipelineRunStatus), ("tags", Mapping[str, str]), ("root_run_id", Optional[str]), ("parent_run_id", Optional[str]), ("pipeline_snapshot_id", Optional[str]), ("execution_plan_snapshot_id", Optional[str]), ("external_pipeline_origin", Optional["ExternalPipelineOrigin"]), ("pipeline_code_origin", Optional[PipelinePythonOrigin]), ("has_repository_load_data", bool), ], ) ): """Serializable internal representation of a pipeline run, as stored in a :py:class:`~dagster._core.storage.runs.RunStorage`. """ def __new__( cls, pipeline_name: str, run_id: Optional[str] = None, run_config: Optional[Mapping[str, object]] = None, mode: Optional[str] = None, asset_selection: Optional[FrozenSet[AssetKey]] = None, solid_selection: Optional[Sequence[str]] = None, solids_to_execute: Optional[FrozenSet[str]] = None, step_keys_to_execute: Optional[Sequence[str]] = None, status: Optional[PipelineRunStatus] = None, tags: Optional[Mapping[str, str]] = None, root_run_id: Optional[str] = None, parent_run_id: Optional[str] = None, pipeline_snapshot_id: Optional[str] = None, execution_plan_snapshot_id: Optional[str] = None, external_pipeline_origin: Optional["ExternalPipelineOrigin"] = None, pipeline_code_origin: Optional[PipelinePythonOrigin] = None, has_repository_load_data: Optional[bool] = None, ): check.invariant( (root_run_id is not None and parent_run_id is not None) or (root_run_id is None and parent_run_id is None), ( "Must set both root_run_id and parent_run_id when creating a PipelineRun that " "belongs to a run group" ), ) # a frozenset which contains the names of the solids to execute solids_to_execute = check.opt_nullable_set_param( solids_to_execute, "solids_to_execute", of_type=str ) # a list of solid queries provided by the user # possible to be None when only solids_to_execute is set by the user directly solid_selection = check.opt_nullable_sequence_param( solid_selection, "solid_selection", of_type=str ) check.opt_nullable_sequence_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) asset_selection = check.opt_nullable_set_param( asset_selection, "asset_selection", of_type=AssetKey ) # Placing this with the other imports causes a cyclic import # https://github.com/dagster-io/dagster/issues/3181 from dagster._core.host_representation.origin import ExternalPipelineOrigin if status == PipelineRunStatus.QUEUED: check.inst_param( external_pipeline_origin, "external_pipeline_origin", ExternalPipelineOrigin, "external_pipeline_origin is required for queued runs", ) if run_id is None: run_id = make_new_run_id() return super(PipelineRun, cls).__new__( cls, pipeline_name=check.str_param(pipeline_name, "pipeline_name"), run_id=check.str_param(run_id, "run_id"), run_config=check.opt_mapping_param(run_config, "run_config", key_type=str), mode=check.opt_str_param(mode, "mode"), solid_selection=solid_selection, asset_selection=asset_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=check.opt_inst_param( status, "status", PipelineRunStatus, PipelineRunStatus.NOT_STARTED ), tags=check.opt_mapping_param(tags, "tags", key_type=str, value_type=str), root_run_id=check.opt_str_param(root_run_id, "root_run_id"), parent_run_id=check.opt_str_param(parent_run_id, "parent_run_id"), pipeline_snapshot_id=check.opt_str_param(pipeline_snapshot_id, "pipeline_snapshot_id"), execution_plan_snapshot_id=check.opt_str_param( execution_plan_snapshot_id, "execution_plan_snapshot_id" ), external_pipeline_origin=check.opt_inst_param( external_pipeline_origin, "external_pipeline_origin", ExternalPipelineOrigin ), pipeline_code_origin=check.opt_inst_param( pipeline_code_origin, "pipeline_code_origin", PipelinePythonOrigin ), has_repository_load_data=check.opt_bool_param( has_repository_load_data, "has_repository_load_data", default=False ), ) def with_status(self, status): if status == PipelineRunStatus.QUEUED: # Placing this with the other imports causes a cyclic import # https://github.com/dagster-io/dagster/issues/3181 from dagster._core.host_representation.origin import ExternalPipelineOrigin check.inst( self.external_pipeline_origin, ExternalPipelineOrigin, "external_pipeline_origin is required for queued runs", ) return self._replace(status=status) def with_job_origin(self, origin): from dagster._core.host_representation.origin import ExternalPipelineOrigin check.inst_param(origin, "origin", ExternalPipelineOrigin) return self._replace(external_pipeline_origin=origin) def with_mode(self, mode): return self._replace(mode=mode) def with_tags(self, tags): return self._replace(tags=tags) def get_root_run_id(self): return self.tags.get(ROOT_RUN_ID_TAG) def get_parent_run_id(self): return self.tags.get(PARENT_RUN_ID_TAG) def tags_for_storage(self): repository_tags = {} if self.external_pipeline_origin: # tag the run with a label containing the repository name / location name, to allow for # per-repository filtering of runs from dagit. repository_tags[ REPOSITORY_LABEL_TAG ] = self.external_pipeline_origin.external_repository_origin.get_label() if not self.tags: return repository_tags return {**repository_tags, **self.tags} @public # type: ignore @property def is_finished(self): return self.status in FINISHED_STATUSES @public # type: ignore @property def is_success(self): return self.status == PipelineRunStatus.SUCCESS @public # type: ignore @property def is_failure(self): return self.status == PipelineRunStatus.FAILURE @public # type: ignore @property def is_failure_or_canceled(self): return self.status == PipelineRunStatus.FAILURE or self.status == PipelineRunStatus.CANCELED @public # type: ignore @property def is_resume_retry(self): return self.tags.get(RESUME_RETRY_TAG) == "true" @property def previous_run_id(self): # Compat return self.parent_run_id @public # type: ignore @property def job_name(self) -> str: return self.pipeline_name @staticmethod def tags_for_schedule(schedule): return {SCHEDULE_NAME_TAG: schedule.name} @staticmethod def tags_for_sensor(sensor): return {SENSOR_NAME_TAG: sensor.name} @staticmethod def tags_for_backfill_id(backfill_id): return {BACKFILL_ID_TAG: backfill_id} @staticmethod def tags_for_partition_set(partition_set, partition): from dagster._core.definitions.multi_dimensional_partitions import ( MultiPartitionKey, get_tags_from_multi_partition_key, ) tags = {PARTITION_SET_TAG: partition_set.name} if isinstance(partition.name, MultiPartitionKey): tags.update(get_tags_from_multi_partition_key(partition.name)) else: tags[PARTITION_NAME_TAG] = partition.name return tags
[docs]@whitelist_for_serdes(serializer=DagsterRunSerializer) class DagsterRun(PipelineRun): """Serializable internal representation of a dagster run, as stored in a :py:class:`~dagster._core.storage.runs.RunStorage`. Subclasses PipelineRun for backcompat purposes. DagsterRun is the actual initialized class used throughout the system. """
# DagsterRun is serialized as PipelineRun so that it can be read by older (pre 0.13.x) version of # Dagster, but is read back in as a DagsterRun. register_serdes_tuple_fallbacks({"PipelineRun": DagsterRun}) class RunsFilterSerializer(DefaultNamedTupleSerializer): @classmethod def value_to_storage_dict( cls, value: NamedTuple, whitelist_map: WhitelistMap, descent_path: str, ) -> Dict[str, Any]: storage = super().value_to_storage_dict( value, whitelist_map, descent_path, ) # For backcompat, we store: # job_name as pipeline_name return replace_storage_keys(storage, {"job_name": "pipeline_name"}) @classmethod def value_from_storage_dict( cls, storage_dict: Dict[str, Any], klass: Type, args_for_class: Mapping[str, Parameter], whitelist_map: WhitelistMap, descent_path: str, ) -> NamedTuple: # We store empty run ids as [] but only accept None if "run_ids" in storage_dict and storage_dict["run_ids"] == []: storage_dict["run_ids"] = None return super().value_from_storage_dict( storage_dict, klass, args_for_class, whitelist_map, descent_path )
[docs]@whitelist_for_serdes(serializer=RunsFilterSerializer) class RunsFilter( NamedTuple( "_RunsFilter", [ ("run_ids", Sequence[str]), ("job_name", Optional[str]), ("statuses", Sequence[DagsterRunStatus]), ("tags", Mapping[str, Union[str, Sequence[str]]]), ("snapshot_id", Optional[str]), ("updated_after", Optional[datetime]), ("mode", Optional[str]), ("created_before", Optional[datetime]), ], ) ): """Defines a filter across job runs, for use when querying storage directly. Each field of the RunsFilter represents a logical AND with each other. For example, if you specify job_name and tags, then you will receive only runs with the specified job_name AND the specified tags. If left blank, then all values will be permitted for that field. Args: run_ids (Optional[List[str]]): A list of job run_id values. job_name (Optional[str]): Name of the job to query for. If blank, all job_names will be accepted. statuses (Optional[List[DagsterRunStatus]]): A list of run statuses to filter by. If blank, all run statuses will be allowed. tags (Optional[Dict[str, Union[str, List[str]]]]): A dictionary of run tags to query by. All tags specified here must be present for a given run to pass the filter. snapshot_id (Optional[str]): The ID of the job snapshot to query for. Intended for internal use. updated_after (Optional[DateTime]): Filter by runs that were last updated before this datetime. created_before (Optional[DateTime]): Filter by runs that were created before this datetime. mode (Optional[str]): (deprecated) pipeline_name (Optional[str]): (deprecated) """ def __new__( cls, run_ids: Optional[Sequence[str]] = None, job_name: Optional[str] = None, statuses: Optional[Sequence[DagsterRunStatus]] = None, tags: Optional[Mapping[str, Union[str, Sequence[str]]]] = None, snapshot_id: Optional[str] = None, updated_after: Optional[datetime] = None, mode: Optional[str] = None, created_before: Optional[datetime] = None, pipeline_name: Optional[str] = None, # for backcompat purposes ): job_name = job_name or pipeline_name check.invariant(run_ids != [], "When filtering on run ids, a non-empty list must be used.") return super(RunsFilter, cls).__new__( cls, run_ids=check.opt_sequence_param(run_ids, "run_ids", of_type=str), job_name=check.opt_str_param(job_name, "job_name"), statuses=check.opt_sequence_param(statuses, "statuses", of_type=PipelineRunStatus), tags=check.opt_mapping_param(tags, "tags", key_type=str), snapshot_id=check.opt_str_param(snapshot_id, "snapshot_id"), updated_after=check.opt_inst_param(updated_after, "updated_after", datetime), mode=check.opt_str_param(mode, "mode"), created_before=check.opt_inst_param(created_before, "created_before", datetime), ) @property def pipeline_name(self): return self.job_name @staticmethod def for_schedule(schedule): return RunsFilter(tags=PipelineRun.tags_for_schedule(schedule)) @staticmethod def for_partition(partition_set, partition): return RunsFilter(tags=PipelineRun.tags_for_partition_set(partition_set, partition)) @staticmethod def for_sensor(sensor): return RunsFilter(tags=PipelineRun.tags_for_sensor(sensor)) @staticmethod def for_backfill(backfill_id): return RunsFilter(tags=PipelineRun.tags_for_backfill_id(backfill_id))
register_serdes_tuple_fallbacks({"PipelineRunsFilter": RunsFilter}) # DEPRECATED - keeping around for backcompat reasons (some folks might have imported directly) PipelineRunsFilter = RunsFilter class JobBucket(NamedTuple): job_names: List[str] bucket_limit: Optional[int] class TagBucket(NamedTuple): tag_key: str tag_values: List[str] bucket_limit: Optional[int]
[docs]class RunRecord( NamedTuple( "_RunRecord", [ ("storage_id", int), ("pipeline_run", PipelineRun), ("create_timestamp", datetime), ("update_timestamp", datetime), ("start_time", Optional[float]), ("end_time", Optional[float]), ], ) ): """Internal representation of a run record, as stored in a :py:class:`~dagster._core.storage.runs.RunStorage`. Users should not invoke this class directly. """ def __new__( cls, storage_id, pipeline_run, create_timestamp, update_timestamp, start_time=None, end_time=None, ): return super(RunRecord, cls).__new__( cls, storage_id=check.int_param(storage_id, "storage_id"), pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun), create_timestamp=check.inst_param(create_timestamp, "create_timestamp", datetime), update_timestamp=check.inst_param(update_timestamp, "update_timestamp", datetime), # start_time and end_time fields will be populated once the run has started and ended, respectively, but will be None beforehand. start_time=check.opt_float_param(start_time, "start_time"), end_time=check.opt_float_param(end_time, "end_time"), )
@whitelist_for_serdes class RunPartitionData( NamedTuple( "_RunPartitionData", [ ("run_id", str), ("partition", str), ("status", DagsterRunStatus), ("start_time", Optional[float]), ("end_time", Optional[float]), ], ) ): def __new__( cls, run_id: str, partition: str, status: DagsterRunStatus, start_time: Optional[float], end_time: Optional[float], ): return super(RunPartitionData, cls).__new__( cls, run_id=check.str_param(run_id, "run_id"), partition=check.str_param(partition, "partition"), status=check.inst_param(status, "status", DagsterRunStatus), start_time=check.opt_inst(start_time, float), end_time=check.opt_inst(end_time, float), ) ################################################################################################### # GRAVEYARD # # -|- # | # _-'~~~~~`-_ # .' '. # | R I P | # | | # | Execution | # | Selector | # | | # | | ################################################################################################### @whitelist_for_serdes class ExecutionSelector( NamedTuple("_ExecutionSelector", [("name", str), ("solid_subset", Optional[Sequence[str]])]) ): """ Kept here to maintain loading of PipelineRuns from when it was still alive. """ def __new__(cls, name: str, solid_subset: Optional[Sequence[str]] = None): return super(ExecutionSelector, cls).__new__( cls, name=check.str_param(name, "name"), solid_subset=None if solid_subset is None else check.sequence_param(solid_subset, "solid_subset", of_type=str), )