Source code for dagster._core.storage.runs.base

from abc import ABC, abstractmethod
from typing import Callable, Iterable, Mapping, Optional, Sequence, Set, Tuple, Union

from dagster._core.events import DagsterEvent
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.instance import MayHaveInstanceWeakref
from dagster._core.snap import ExecutionPlanSnapshot, PipelineSnapshot
from dagster._core.storage.pipeline_run import (
    JobBucket,
    PipelineRun,
    RunPartitionData,
    RunRecord,
    RunsFilter,
    TagBucket,
)
from dagster._daemon.types import DaemonHeartbeat


[docs]class RunStorage(ABC, MayHaveInstanceWeakref): """Abstract base class for storing pipeline run history. Note that run storages using SQL databases as backing stores should implement :py:class:`~dagster._core.storage.runs.SqlRunStorage`. Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when ``dagit`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class should be done by setting values in that file. """ @abstractmethod def add_run(self, pipeline_run: PipelineRun) -> PipelineRun: """Add a run to storage. If a run already exists with the same ID, raise DagsterRunAlreadyExists If the run's snapshot ID does not exist raise DagsterSnapshotDoesNotExist Args: pipeline_run (PipelineRun): The run to add. """ @abstractmethod def handle_run_event(self, run_id: str, event: DagsterEvent): """Update run storage in accordance to a pipeline run related DagsterEvent Args: run_id (str) event (DagsterEvent) """ @abstractmethod def get_runs( self, filters: Optional[RunsFilter] = None, cursor: Optional[str] = None, limit: Optional[int] = None, bucket_by: Optional[Union[JobBucket, TagBucket]] = None, ) -> Iterable[PipelineRun]: """Return all the runs present in the storage that match the given filters. Args: filters (Optional[RunsFilter]) -- The :py:class:`~dagster._core.storage.pipeline_run.RunsFilter` by which to filter runs cursor (Optional[str]): Starting cursor (run_id) of range of runs limit (Optional[int]): Number of results to get. Defaults to infinite. Returns: List[PipelineRun] """ @abstractmethod def get_runs_count(self, filters: Optional[RunsFilter] = None) -> int: """Return the number of runs present in the storage that match the given filters. Args: filters (Optional[RunsFilter]) -- The :py:class:`~dagster._core.storage.pipeline_run.PipelineRunFilter` by which to filter runs Returns: int: The number of runs that match the given filters. """ @abstractmethod def get_run_group(self, run_id: str) -> Optional[Tuple[str, Iterable[PipelineRun]]]: """Get the run group to which a given run belongs. Args: run_id (str): If the corresponding run is the descendant of some root run (i.e., there is a root_run_id on the :py:class:`PipelineRun`), that root run and all of its descendants are returned; otherwise, the group will consist only of the given run (a run that does not descend from any root is its own root). Returns: Optional[Tuple[string, List[PipelineRun]]]: If there is a corresponding run group, tuple whose first element is the root_run_id and whose second element is a list of all the descendent runs. Otherwise `None`. """ @abstractmethod def get_run_groups( self, filters: Optional[RunsFilter] = None, cursor: Optional[str] = None, limit: Optional[int] = None, ) -> Mapping[str, Mapping[str, Union[Iterable[PipelineRun], int]]]: """Return all of the run groups present in the storage that include rows matching the given filter. Args: filter (Optional[RunsFilter]) -- The :py:class:`~dagster._core.storage.pipeline_run.RunsFilter` by which to filter runs cursor (Optional[str]): Starting cursor (run_id) of range of runs limit (Optional[int]): Number of results to get. Defaults to infinite. Returns: Dict[str, Dict[str, Union[List[PipelineRun], int]]]: Specifically, a dict of the form ``{'pipeline_run_id': {'runs': [PipelineRun, ...], 'count': int}, ...}``. The instances of :py:class:`~dagster._core.pipeline_run.PipelineRun` returned in this data structure correspond to all of the runs that would have been returned by calling :py:meth:`get_run_groups` with the same arguments, plus their corresponding root runs, if any. The keys of this structure are the run_ids of all of the root runs (a run with no root is its own root). The integer counts are inclusive of all of the root runs' children, including those that would not have been returned by calling :py:meth:`get_run_groups` with the same arguments, but exclusive of the root run itself; i.e., if a run has no children, the count will be 0. """ # Note that we could have made the opposite decision here and filtered for root runs # matching a given filter, etc., rather than for child runs; so that asking for the last 5 # run groups would give the last 5 roots and their descendants, rather than the last 5 # children and their roots. Consider the case where we have just been retrying runs # belonging to a group created long ago; it makes sense to bump these to the top of the # interface rather than burying them deeply paginated down. Note also that this query can # return no more run groups than there are runs in an equivalent call to get_runs, and no # more than 2x total instances of PipelineRun. @abstractmethod def get_run_by_id(self, run_id: str) -> Optional[PipelineRun]: """Get a run by its id. Args: run_id (str): The id of the run Returns: Optional[PipelineRun] """ @abstractmethod def get_run_records( self, filters: Optional[RunsFilter] = None, limit: Optional[int] = None, order_by: Optional[str] = None, ascending: bool = False, cursor: Optional[str] = None, bucket_by: Optional[Union[JobBucket, TagBucket]] = None, ) -> Sequence[RunRecord]: """Return a list of run records stored in the run storage, sorted by the given column in given order. Args: filters (Optional[RunsFilter]): the filter by which to filter runs. limit (Optional[int]): Number of results to get. Defaults to infinite. order_by (Optional[str]): Name of the column to sort by. Defaults to id. ascending (Optional[bool]): Sort the result in ascending order if True, descending otherwise. Defaults to descending. Returns: List[RunRecord]: List of run records stored in the run storage. """ @abstractmethod def get_run_tags(self) -> Sequence[Tuple[str, Set[str]]]: """Get a list of tag keys and the values that have been associated with them. Returns: List[Tuple[str, Set[str]]] """ @abstractmethod def add_run_tags(self, run_id: str, new_tags: Mapping[str, str]): """Add additional tags for a pipeline run. Args: run_id (str) new_tags (Dict[string, string]) """ @abstractmethod def has_run(self, run_id: str) -> bool: """Check if the storage contains a run. Args: run_id (str): The id of the run Returns: bool """ def add_snapshot( self, snapshot: Union[PipelineSnapshot, ExecutionPlanSnapshot], snapshot_id: Optional[str] = None, ): """Add a snapshot to the storage. Args: snapshot (Union[PipelineSnapshot, ExecutionPlanSnapshot]) snapshot_id (Optional[str]): [Internal] The id of the snapshot. If not provided, the snapshot id will be generated from a hash of the snapshot. This should only be used in debugging, where we might want to import a historical run whose snapshots were calculated using a different hash function than the current code. """ if isinstance(snapshot, PipelineSnapshot): self.add_pipeline_snapshot(snapshot, snapshot_id) else: self.add_execution_plan_snapshot(snapshot, snapshot_id) def has_snapshot(self, snapshot_id: str): return self.has_pipeline_snapshot(snapshot_id) or self.has_execution_plan_snapshot( snapshot_id ) @abstractmethod def has_pipeline_snapshot(self, pipeline_snapshot_id: str) -> bool: """Check to see if storage contains a pipeline snapshot. Args: pipeline_snapshot_id (str): The id of the run. Returns: bool """ @abstractmethod def add_pipeline_snapshot( self, pipeline_snapshot: PipelineSnapshot, snapshot_id: Optional[str] = None ) -> str: """Add a pipeline snapshot to the run store. Pipeline snapshots are content-addressable, meaning that the ID for a snapshot is a hash based on the body of the snapshot. This function returns that snapshot ID. Args: pipeline_snapshot (PipelineSnapshot) snapshot_id (Optional[str]): [Internal] The id of the snapshot. If not provided, the snapshot id will be generated from a hash of the snapshot. This should only be used in debugging, where we might want to import a historical run whose snapshots were calculated using a different hash function than the current code. Return: str: The pipeline_snapshot_id """ @abstractmethod def get_pipeline_snapshot(self, pipeline_snapshot_id: str) -> PipelineSnapshot: """Fetch a snapshot by ID Args: pipeline_snapshot_id (str) Returns: PipelineSnapshot """ @abstractmethod def has_execution_plan_snapshot(self, execution_plan_snapshot_id: str) -> bool: """Check to see if storage contains an execution plan snapshot. Args: execution_plan_snapshot_id (str): The id of the execution plan. Returns: bool """ @abstractmethod def add_execution_plan_snapshot( self, execution_plan_snapshot: ExecutionPlanSnapshot, snapshot_id: Optional[str] = None ) -> str: """Add an execution plan snapshot to the run store. Execution plan snapshots are content-addressable, meaning that the ID for a snapshot is a hash based on the body of the snapshot. This function returns that snapshot ID. Args: execution_plan_snapshot (ExecutionPlanSnapshot) snapshot_id (Optional[str]): [Internal] The id of the snapshot. If not provided, the snapshot id will be generated from a hash of the snapshot. This should only be used in debugging, where we might want to import a historical run whose snapshots were calculated using a different hash function than the current code. Return: str: The execution_plan_snapshot_id """ @abstractmethod def get_execution_plan_snapshot(self, execution_plan_snapshot_id: str) -> ExecutionPlanSnapshot: """Fetch a snapshot by ID Args: execution_plan_snapshot_id (str) Returns: ExecutionPlanSnapshot """ @abstractmethod def wipe(self): """Clears the run storage.""" @abstractmethod def delete_run(self, run_id: str): """Remove a run from storage""" @property def supports_bucket_queries(self): return True @abstractmethod def get_run_partition_data(self, runs_filter: RunsFilter) -> Sequence[RunPartitionData]: """Get run partition data for a given partitioned job.""" def migrate(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False): """Call this method to run any required data migrations""" def optimize(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False): """Call this method to run any optional data migrations for optimized reads""" def dispose(self): """Explicit lifecycle management.""" def optimize_for_dagit(self, statement_timeout: int, pool_recycle: int): """Allows for optimizing database connection / use in the context of a long lived dagit process""" # Daemon Heartbeat Storage # # Holds heartbeats from the Dagster Daemon so that other system components can alert when it's not # alive. # This is temporarily placed along with run storage to avoid adding a new instance concept. It # should be split out once all metadata storages are configured together. @abstractmethod def add_daemon_heartbeat(self, daemon_heartbeat: DaemonHeartbeat): """Called on a regular interval by the daemon""" @abstractmethod def get_daemon_heartbeats(self) -> Mapping[str, DaemonHeartbeat]: """Latest heartbeats of all daemon types""" @abstractmethod def wipe_daemon_heartbeats(self): """Wipe all daemon heartbeats""" # Backfill storage @abstractmethod def get_backfills( self, status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, ) -> Sequence[PartitionBackfill]: """Get a list of partition backfills""" @abstractmethod def get_backfill(self, backfill_id: str) -> Optional[PartitionBackfill]: """Get the partition backfill of the given backfill id.""" @abstractmethod def add_backfill(self, partition_backfill: PartitionBackfill): """Add partition backfill to run storage""" @abstractmethod def update_backfill(self, partition_backfill: PartitionBackfill): """Update a partition backfill in run storage""" def alembic_version(self): return None # Key Value Storage # # Stores arbitrary key-value pairs. Currently used for the cursor for the auto-reexecution # deamon. Bundled into run storage for convenience. def supports_kvs(self): return True @abstractmethod def kvs_get(self, keys: Set[str]) -> Mapping[str, str]: """Retrieve the value for a given key in the current deployment.""" @abstractmethod def kvs_set(self, pairs: Mapping[str, str]) -> None: """Set the value for a given key in the current deployment."""