import logging
import logging.config
import os
import sys
import time
import warnings
import weakref
from collections import defaultdict
from contextlib import ExitStack
from enum import Enum
from tempfile import TemporaryDirectory
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
cast,
)
import yaml
import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.pipeline_base import InMemoryPipeline
from dagster._core.definitions.pipeline_definition import (
PipelineDefinition,
PipelineSubsetDefinition,
)
from dagster._core.errors import (
DagsterHomeNotSetError,
DagsterInvariantViolationError,
DagsterRunAlreadyExists,
DagsterRunConflict,
)
from dagster._core.storage.pipeline_run import (
IN_PROGRESS_RUN_STATUSES,
DagsterRun,
JobBucket,
PipelineRun,
PipelineRunStatsSnapshot,
PipelineRunStatus,
RunPartitionData,
RunRecord,
RunsFilter,
TagBucket,
)
from dagster._core.storage.tags import PARENT_RUN_ID_TAG, RESUME_RETRY_TAG, ROOT_RUN_ID_TAG
from dagster._core.system_config.objects import ResolvedRunConfig
from dagster._core.utils import str_format_list
from dagster._serdes import ConfigurableClass
from dagster._seven import get_current_datetime_in_utc
from dagster._utils import merge_dicts, traced
from dagster._utils.backcompat import experimental_functionality_warning
from dagster._utils.error import serializable_error_info_from_exc_info
from .config import (
DAGSTER_CONFIG_YAML_FILENAME,
DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT,
get_default_tick_retention_settings,
get_tick_retention_settings,
is_dagster_home_set,
)
from .ref import InstanceRef
# 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the
# airflow ingestion logic (see: dagster_pipeline_factory.py). 'airflow_execution_date' stores the
# 'execution_date' used in Airflow operator execution and 'is_airflow_ingest_pipeline' determines
# whether 'airflow_execution_date' is needed.
# https://github.com/dagster-io/dagster/issues/2403
AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date"
IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline"
if TYPE_CHECKING:
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
from dagster._core.host_representation import (
ExternalPipeline,
ExternalSensor,
HistoricalPipeline,
RepositoryLocation,
)
from dagster._core.launcher import RunLauncher
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.scheduler import Scheduler
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorTick,
TickData,
TickStatus,
)
from dagster._core.secrets import SecretsLoader
from dagster._core.snap import ExecutionPlanSnapshot, PipelineSnapshot
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.event_log import EventLogStorage
from dagster._core.storage.event_log.base import AssetRecord, EventLogRecord, EventRecordsFilter
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import RunStorage
from dagster._core.storage.schedules import ScheduleStorage
from dagster._core.workspace.workspace import IWorkspace
from dagster._daemon.types import DaemonHeartbeat, DaemonStatus
def _check_run_equality(
pipeline_run: PipelineRun, candidate_run: PipelineRun
) -> Mapping[str, Tuple[Any, Any]]:
field_diff = {}
for field in pipeline_run._fields:
expected_value = getattr(pipeline_run, field)
candidate_value = getattr(candidate_run, field)
if expected_value != candidate_value:
field_diff[field] = (expected_value, candidate_value)
return field_diff
def _format_field_diff(field_diff: Mapping[str, Tuple[Any, Any]]) -> str:
return "\n".join(
[
(
" {field_name}:\n"
+ " Expected: {expected_value}\n"
+ " Received: {candidate_value}"
).format(
field_name=field_name,
expected_value=expected_value,
candidate_value=candidate_value,
)
for field_name, (
expected_value,
candidate_value,
) in field_diff.items()
]
)
class _EventListenerLogHandler(logging.Handler):
def __init__(self, instance):
self._instance = instance
super(_EventListenerLogHandler, self).__init__()
def emit(self, record):
from dagster._core.events import EngineEventData
from dagster._core.events.log import StructuredLoggerMessage, construct_event_record
event = construct_event_record(
StructuredLoggerMessage(
name=record.name,
message=record.msg,
level=record.levelno,
meta=record.dagster_meta,
record=record,
)
)
try:
self._instance.handle_new_event(event)
except Exception as e:
sys.stderr.write(f"Exception while writing logger call to event log: {str(e)}\n")
if event.dagster_event:
# Swallow user-generated log failures so that the entire step/run doesn't fail, but
# raise failures writing system-generated log events since they are the source of
# truth for the state of the run
raise
elif event.run_id:
self._instance.report_engine_event(
"Exception while writing logger call to event log",
pipeline_name=event.pipeline_name,
run_id=event.run_id,
step_key=event.step_key,
engine_event_data=EngineEventData(
error=serializable_error_info_from_exc_info(sys.exc_info()),
),
)
class InstanceType(Enum):
PERSISTENT = "PERSISTENT"
EPHEMERAL = "EPHEMERAL"
T_DagsterInstance = TypeVar("T_DagsterInstance", bound="DagsterInstance")
class MayHaveInstanceWeakref(Generic[T_DagsterInstance]):
"""Mixin for classes that can have a weakref back to a Dagster instance."""
def __init__(self):
self._instance_weakref: Optional[weakref.ReferenceType[T_DagsterInstance]] = None
@property
def _instance(self) -> T_DagsterInstance:
instance = (
self._instance_weakref()
# Backcompat with custom subclasses that don't call super().__init__()
# in their own __init__ implementations
if (hasattr(self, "_instance_weakref") and self._instance_weakref is not None)
else None
)
return cast(T_DagsterInstance, instance)
def register_instance(self, instance: T_DagsterInstance):
check.invariant(
# Backcompat with custom subclasses that don't call super().__init__()
# in their own __init__ implementations
(not hasattr(self, "_instance_weakref") or self._instance_weakref is None),
"Must only call initialize once",
)
# Store a weakref to avoid a circular reference / enable GC
self._instance_weakref = weakref.ref(instance)
[docs]class DagsterInstance:
"""Core abstraction for managing Dagster's access to storage and other resources.
Use DagsterInstance.get() to grab the current DagsterInstance which will load based on
the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``.
Alternatively, DagsterInstance.ephemeral() can use used which provides a set of
transient in-memory components.
Configuration of this class should be done by setting values in ``$DAGSTER_HOME/dagster.yaml``.
For example, to use Postgres for dagster storage, you can write a ``dagster.yaml`` such as the
following:
.. literalinclude:: ../../../../../examples/docs_snippets/docs_snippets/deploying/dagster-pg.yaml
:caption: dagster.yaml
:language: YAML
Args:
instance_type (InstanceType): Indicates whether the instance is ephemeral or persistent.
Users should not attempt to set this value directly or in their ``dagster.yaml`` files.
local_artifact_storage (LocalArtifactStorage): The local artifact storage is used to
configure storage for any artifacts that require a local disk, such as schedules, or
when using the filesystem system storage to manage files and intermediates. By default,
this will be a :py:class:`dagster._core.storage.root.LocalArtifactStorage`. Configurable
in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass`
machinery.
run_storage (RunStorage): The run storage is used to store metadata about ongoing and past
pipeline runs. By default, this will be a
:py:class:`dagster._core.storage.runs.SqliteRunStorage`. Configurable in ``dagster.yaml``
using the :py:class:`~dagster.serdes.ConfigurableClass` machinery.
event_storage (EventLogStorage): Used to store the structured event logs generated by
pipeline runs. By default, this will be a
:py:class:`dagster._core.storage.event_log.SqliteEventLogStorage`. Configurable in
``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery.
compute_log_manager (ComputeLogManager): The compute log manager handles stdout and stderr
logging for solid compute functions. By default, this will be a
:py:class:`dagster._core.storage.local_compute_log_manager.LocalComputeLogManager`.
Configurable in ``dagster.yaml`` using the
:py:class:`~dagster.serdes.ConfigurableClass` machinery.
run_coordinator (RunCoordinator): A runs coordinator may be used to manage the execution
of pipeline runs.
run_launcher (Optional[RunLauncher]): Optionally, a run launcher may be used to enable
a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in
addition to running them locally.
settings (Optional[Dict]): Specifies certain per-instance settings,
such as feature flags. These are set in the ``dagster.yaml`` under a set of whitelisted
keys.
ref (Optional[InstanceRef]): Used by internal machinery to pass instances across process
boundaries.
"""
_PROCESS_TEMPDIR: Optional[TemporaryDirectory] = None
_EXIT_STACK = None
def __init__(
self,
instance_type: InstanceType,
local_artifact_storage: "LocalArtifactStorage",
run_storage: "RunStorage",
event_storage: "EventLogStorage",
compute_log_manager: "ComputeLogManager",
run_coordinator: "RunCoordinator",
run_launcher: "RunLauncher",
scheduler: Optional["Scheduler"] = None,
schedule_storage: Optional["ScheduleStorage"] = None,
settings: Optional[Mapping[str, Any]] = None,
secrets_loader: Optional["SecretsLoader"] = None,
ref: Optional[InstanceRef] = None,
):
from dagster._core.launcher import RunLauncher
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.scheduler import Scheduler
from dagster._core.secrets import SecretsLoader
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.event_log import EventLogStorage
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import RunStorage
from dagster._core.storage.schedules import ScheduleStorage
self._instance_type = check.inst_param(instance_type, "instance_type", InstanceType)
self._local_artifact_storage = check.inst_param(
local_artifact_storage, "local_artifact_storage", LocalArtifactStorage
)
self._event_storage = check.inst_param(event_storage, "event_storage", EventLogStorage)
self._event_storage.register_instance(self)
self._run_storage = check.inst_param(run_storage, "run_storage", RunStorage)
self._run_storage.register_instance(self)
self._compute_log_manager = check.inst_param(
compute_log_manager, "compute_log_manager", ComputeLogManager
)
self._compute_log_manager.register_instance(self)
self._scheduler = check.opt_inst_param(scheduler, "scheduler", Scheduler)
self._schedule_storage = check.opt_inst_param(
schedule_storage, "schedule_storage", ScheduleStorage
)
if self._schedule_storage:
self._schedule_storage.register_instance(self)
self._run_coordinator = check.inst_param(run_coordinator, "run_coordinator", RunCoordinator)
self._run_coordinator.register_instance(self)
self._run_launcher = check.inst_param(run_launcher, "run_launcher", RunLauncher)
self._run_launcher.register_instance(self)
self._settings = check.opt_mapping_param(settings, "settings")
self._secrets_loader = check.opt_inst_param(secrets_loader, "secrets_loader", SecretsLoader)
if self._secrets_loader:
self._secrets_loader.register_instance(self)
self._ref = check.opt_inst_param(ref, "ref", InstanceRef)
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
run_monitoring_enabled = self.run_monitoring_settings.get("enabled", False)
if run_monitoring_enabled and not self.run_launcher.supports_check_run_worker_health:
run_monitoring_enabled = False
warnings.warn(
"The configured run launcher does not support run monitoring, disabling it.",
)
self._run_monitoring_enabled = run_monitoring_enabled
if self.run_monitoring_enabled and self.run_monitoring_max_resume_run_attempts:
check.invariant(
self.run_launcher.supports_resume_run,
"The configured run launcher does not support resuming runs. "
"Set max_resume_run_attempts to 0 to use run monitoring. Any runs with a failed run "
"worker will be marked as failed, but will not be resumed.",
)
if self.run_retries_enabled:
check.invariant(
self.run_storage.supports_kvs(),
"Run retries are enabled, but the configured run storage does not support them. "
"Consider switching to Postgres or Mysql.",
)
check.invariant(
self.event_log_storage.supports_event_consumer_queries(),
"Run retries are enabled, but the configured event log storage does not support them. "
"Consider switching to Postgres or Mysql.",
)
# ctors
@public
@staticmethod
def ephemeral(
tempdir: Optional[str] = None, preload: Optional[Sequence["DebugRunPayload"]] = None
) -> "DagsterInstance":
from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher
from dagster._core.run_coordinator import DefaultRunCoordinator
from dagster._core.storage.event_log import InMemoryEventLogStorage
from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import InMemoryRunStorage
if tempdir is None:
tempdir = DagsterInstance.temp_storage()
return DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(tempdir),
run_storage=InMemoryRunStorage(preload=preload),
event_storage=InMemoryEventLogStorage(preload=preload),
compute_log_manager=NoOpComputeLogManager(),
run_coordinator=DefaultRunCoordinator(),
run_launcher=SyncInMemoryRunLauncher(),
)
@public
@staticmethod
def get() -> "DagsterInstance":
dagster_home_path = os.getenv("DAGSTER_HOME")
if not dagster_home_path:
raise DagsterHomeNotSetError(
(
"The environment variable $DAGSTER_HOME is not set. \n"
"Dagster requires this environment variable to be set to an existing directory in your filesystem. "
"This directory is used to store metadata across sessions, or load the dagster.yaml "
"file which can configure storing metadata in an external database.\n"
"You can resolve this error by exporting the environment variable. For example, you can run the following command in your shell or include it in your shell configuration file:\n"
'\texport DAGSTER_HOME=~"/dagster_home"\n'
"or PowerShell\n"
"$env:DAGSTER_HOME = ($home + '\\dagster_home')"
"or batch"
"set DAGSTER_HOME=%UserProfile%/dagster_home"
"Alternatively, DagsterInstance.ephemeral() can be used for a transient instance.\n"
)
)
dagster_home_path = os.path.expanduser(dagster_home_path)
if not os.path.isabs(dagster_home_path):
raise DagsterInvariantViolationError(
(
'$DAGSTER_HOME "{}" must be an absolute path. Dagster requires this '
"environment variable to be set to an existing directory in your filesystem."
).format(dagster_home_path)
)
if not (os.path.exists(dagster_home_path) and os.path.isdir(dagster_home_path)):
raise DagsterInvariantViolationError(
(
'$DAGSTER_HOME "{}" is not a directory or does not exist. Dagster requires this '
"environment variable to be set to an existing directory in your filesystem"
).format(dagster_home_path)
)
return DagsterInstance.from_config(dagster_home_path)
@public
@staticmethod
def local_temp(tempdir=None, overrides=None) -> "DagsterInstance":
if tempdir is None:
tempdir = DagsterInstance.temp_storage()
return DagsterInstance.from_ref(InstanceRef.from_dir(tempdir, overrides=overrides))
@staticmethod
def from_config(
config_dir: str,
config_filename: str = DAGSTER_CONFIG_YAML_FILENAME,
) -> "DagsterInstance":
instance_ref = InstanceRef.from_dir(config_dir, config_filename=config_filename)
return DagsterInstance.from_ref(instance_ref)
@staticmethod
def from_ref(instance_ref: InstanceRef) -> "DagsterInstance":
check.inst_param(instance_ref, "instance_ref", InstanceRef)
# DagsterInstance doesn't implement ConfigurableClass, but we may still sometimes want to
# have custom subclasses of DagsterInstance. This machinery allows for those custom
# subclasses to receive additional keyword arguments passed through the config YAML.
klass = instance_ref.custom_instance_class or DagsterInstance
kwargs = instance_ref.custom_instance_class_config
unified_storage = instance_ref.storage
run_storage = unified_storage.run_storage if unified_storage else instance_ref.run_storage
event_storage = (
unified_storage.event_log_storage if unified_storage else instance_ref.event_storage
)
schedule_storage = (
unified_storage.schedule_storage if unified_storage else instance_ref.schedule_storage
)
return klass( # type: ignore
instance_type=InstanceType.PERSISTENT,
local_artifact_storage=instance_ref.local_artifact_storage,
run_storage=run_storage,
event_storage=event_storage,
schedule_storage=schedule_storage,
compute_log_manager=instance_ref.compute_log_manager,
scheduler=instance_ref.scheduler,
run_coordinator=instance_ref.run_coordinator,
run_launcher=instance_ref.run_launcher,
settings=instance_ref.settings,
secrets_loader=instance_ref.secrets_loader,
ref=instance_ref,
**kwargs,
)
# flags
@property
def is_persistent(self) -> bool:
return self._instance_type == InstanceType.PERSISTENT
@property
def is_ephemeral(self) -> bool:
return self._instance_type == InstanceType.EPHEMERAL
def get_ref(self) -> InstanceRef:
if self._ref:
return self._ref
check.failed(
"Attempted to prepare an ineligible DagsterInstance ({inst_type}) for cross "
"process communication.{dagster_home_msg}".format(
inst_type=self._instance_type,
dagster_home_msg="\nDAGSTER_HOME environment variable is not set, set it to "
"a directory on the filesystem for dagster to use for storage and cross "
"process coordination."
if os.getenv("DAGSTER_HOME") is None
else "",
)
)
@property
def root_directory(self) -> str:
return self._local_artifact_storage.base_dir
@staticmethod
def temp_storage() -> str:
from dagster._core.test_utils import environ
if DagsterInstance._PROCESS_TEMPDIR is None:
DagsterInstance._EXIT_STACK = ExitStack()
DagsterInstance._EXIT_STACK.enter_context(
environ({"DAGSTER_TELEMETRY_DISABLED": "yes"})
)
DagsterInstance._PROCESS_TEMPDIR = TemporaryDirectory()
return cast(TemporaryDirectory, DagsterInstance._PROCESS_TEMPDIR).name
def _info(self, component):
# ConfigurableClass may not have inst_data if it's a direct instantiation
# which happens for ephemeral instances
if isinstance(component, ConfigurableClass) and component.inst_data:
return component.inst_data.info_dict()
if type(component) is dict:
return component
return component.__class__.__name__
def _info_str_for_component(self, component_name, component):
return yaml.dump(
{component_name: self._info(component)}, default_flow_style=False, sort_keys=False
)
def info_dict(self):
settings = self._settings if self._settings else {}
ret = {
"local_artifact_storage": self._info(self._local_artifact_storage),
"run_storage": self._info(self._run_storage),
"event_log_storage": self._info(self._event_storage),
"compute_logs": self._info(self._compute_log_manager),
"schedule_storage": self._info(self._schedule_storage),
"scheduler": self._info(self._scheduler),
"run_coordinator": self._info(self._run_coordinator),
"run_launcher": self._info(self._run_launcher),
}
ret.update(
{
settings_key: self._info(settings_value)
for settings_key, settings_value in settings.items()
}
)
return ret
def info_str(self) -> str:
return yaml.dump(self.info_dict(), default_flow_style=False, sort_keys=False)
def schema_str(self) -> str:
def _schema_dict(alembic_version):
if not alembic_version:
return None
db_revision, head_revision = alembic_version
return {
"current": db_revision,
"latest": head_revision,
}
return yaml.dump(
{
"schema": {
"event_log_storage": _schema_dict(self._event_storage.alembic_version()),
"run_storage": _schema_dict(self._event_storage.alembic_version()),
"schedule_storage": _schema_dict(self._event_storage.alembic_version()),
}
},
default_flow_style=False,
sort_keys=False,
)
@property
def run_storage(self) -> "RunStorage":
return self._run_storage
@property
def event_log_storage(self) -> "EventLogStorage":
return self._event_storage
# schedule storage
@property
def schedule_storage(self) -> Optional["ScheduleStorage"]:
return self._schedule_storage
@property
def scheduler(self) -> Optional["Scheduler"]:
return self._scheduler
@property
def scheduler_class(self) -> Optional[str]:
return self.scheduler.__class__.__name__ if self.scheduler else None
# run coordinator
@property
def run_coordinator(self) -> "RunCoordinator":
return self._run_coordinator
# run launcher
@property
def run_launcher(self) -> "RunLauncher":
return self._run_launcher
# compute logs
@property
def compute_log_manager(self) -> "ComputeLogManager":
return self._compute_log_manager
def get_settings(self, settings_key: str) -> Any:
check.str_param(settings_key, "settings_key")
if self._settings and settings_key in self._settings:
return self._settings.get(settings_key)
return {}
@property
def telemetry_enabled(self) -> bool:
if self.is_ephemeral:
return False
dagster_telemetry_enabled_default = True
telemetry_settings = self.get_settings("telemetry")
if not telemetry_settings:
return dagster_telemetry_enabled_default
if "enabled" in telemetry_settings:
return telemetry_settings["enabled"]
else:
return dagster_telemetry_enabled_default
# run monitoring
@property
def run_monitoring_enabled(self) -> bool:
return self._run_monitoring_enabled
@property
def run_monitoring_settings(self) -> Mapping:
return self.get_settings("run_monitoring")
@property
def run_monitoring_start_timeout_seconds(self) -> int:
return self.run_monitoring_settings.get("start_timeout_seconds", 180)
@property
def code_server_settings(self) -> Mapping:
return self.get_settings("code_servers")
@property
def code_server_process_startup_timeout(self) -> int:
return self.code_server_settings.get(
"local_startup_timeout", DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT
)
@property
def run_monitoring_max_resume_run_attempts(self) -> int:
default_max_resume_run_attempts = 3 if self.run_launcher.supports_resume_run else 0
return self.run_monitoring_settings.get(
"max_resume_run_attempts", default_max_resume_run_attempts
)
@property
def run_monitoring_poll_interval_seconds(self) -> int:
return self.run_monitoring_settings.get("poll_interval_seconds", 120)
@property
def cancellation_thread_poll_interval_seconds(self) -> int:
return self.get_settings("run_monitoring").get(
"cancellation_thread_poll_interval_seconds", 10
)
@property
def run_retries_enabled(self) -> bool:
return self.get_settings("run_retries").get("enabled", False)
@property
def run_retries_max_retries(self) -> int:
return self.get_settings("run_retries").get("max_retries")
# python logs
@property
def managed_python_loggers(self) -> Sequence[str]:
python_log_settings = self.get_settings("python_logs") or {}
return python_log_settings.get("managed_python_loggers", [])
@property
def python_log_level(self) -> Optional[str]:
python_log_settings = self.get_settings("python_logs") or {}
return python_log_settings.get("python_log_level")
def upgrade(self, print_fn=None):
from dagster._core.storage.migration.utils import upgrading_instance
with upgrading_instance(self):
if print_fn:
print_fn("Updating run storage...")
self._run_storage.upgrade()
self._run_storage.migrate(print_fn)
if print_fn:
print_fn("Updating event storage...")
self._event_storage.upgrade()
self._event_storage.reindex_assets(print_fn=print_fn)
if print_fn:
print_fn("Updating schedule storage...")
self._schedule_storage.upgrade()
self._schedule_storage.migrate(print_fn)
def optimize_for_dagit(self, statement_timeout, pool_recycle):
if self._schedule_storage:
self._schedule_storage.optimize_for_dagit(
statement_timeout=statement_timeout, pool_recycle=pool_recycle
)
self._run_storage.optimize_for_dagit(
statement_timeout=statement_timeout, pool_recycle=pool_recycle
)
self._event_storage.optimize_for_dagit(
statement_timeout=statement_timeout, pool_recycle=pool_recycle
)
def reindex(self, print_fn=lambda _: None):
print_fn("Checking for reindexing...")
self._event_storage.reindex_events(print_fn)
self._event_storage.reindex_assets(print_fn)
self._run_storage.optimize(print_fn)
self._schedule_storage.optimize(print_fn)
print_fn("Done.")
def dispose(self):
self._run_storage.dispose()
self.run_coordinator.dispose()
self._run_launcher.dispose()
self._event_storage.dispose()
self._compute_log_manager.dispose()
if self._secrets_loader:
self._secrets_loader.dispose()
# run storage
@public
@traced
def get_run_by_id(self, run_id: str) -> Optional[DagsterRun]:
return cast(DagsterRun, self._run_storage.get_run_by_id(run_id))
@traced
def get_pipeline_snapshot(self, snapshot_id: str) -> "PipelineSnapshot":
return self._run_storage.get_pipeline_snapshot(snapshot_id)
@traced
def has_pipeline_snapshot(self, snapshot_id: str) -> bool:
return self._run_storage.has_pipeline_snapshot(snapshot_id)
@traced
def has_snapshot(self, snapshot_id: str) -> bool:
return self._run_storage.has_snapshot(snapshot_id)
@traced
def get_historical_pipeline(self, snapshot_id: str) -> "HistoricalPipeline":
from dagster._core.host_representation import HistoricalPipeline
snapshot = self._run_storage.get_pipeline_snapshot(snapshot_id)
parent_snapshot = (
self._run_storage.get_pipeline_snapshot(snapshot.lineage_snapshot.parent_snapshot_id)
if snapshot.lineage_snapshot
else None
)
return HistoricalPipeline(snapshot, snapshot_id, parent_snapshot)
@traced
def has_historical_pipeline(self, snapshot_id: str) -> bool:
return self._run_storage.has_pipeline_snapshot(snapshot_id)
@traced
def get_execution_plan_snapshot(self, snapshot_id: str) -> "ExecutionPlanSnapshot":
return self._run_storage.get_execution_plan_snapshot(snapshot_id)
@traced
def get_run_stats(self, run_id: str) -> PipelineRunStatsSnapshot:
return self._event_storage.get_stats_for_run(run_id)
@traced
def get_run_step_stats(self, run_id, step_keys=None) -> Sequence["RunStepKeyStatsSnapshot"]:
return self._event_storage.get_step_stats_for_run(run_id, step_keys)
@traced
def get_run_tags(self) -> Sequence[Tuple[str, Set[str]]]:
return self._run_storage.get_run_tags()
@traced
def get_run_group(self, run_id: str) -> Optional[Tuple[str, Iterable[PipelineRun]]]:
return self._run_storage.get_run_group(run_id)
def create_run_for_pipeline(
self,
pipeline_def,
execution_plan=None,
run_id=None,
run_config=None,
mode=None,
solids_to_execute=None,
status=None,
tags=None,
root_run_id=None,
parent_run_id=None,
solid_selection=None,
asset_selection=None,
external_pipeline_origin=None,
pipeline_code_origin=None,
repository_load_data=None,
) -> PipelineRun:
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.snap import snapshot_from_execution_plan
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
check.opt_inst_param(execution_plan, "execution_plan", ExecutionPlan)
# note that solids_to_execute is required to execute the solid subset, which is the
# frozenset version of the previous solid_subset.
# solid_selection is not required and will not be converted to solids_to_execute here.
# i.e. this function doesn't handle solid queries.
# solid_selection is only used to pass the user queries further down.
check.opt_set_param(solids_to_execute, "solids_to_execute", of_type=str)
check.opt_list_param(solid_selection, "solid_selection", of_type=str)
check.opt_set_param(asset_selection, "asset_selection", of_type=AssetKey)
if solids_to_execute:
if isinstance(pipeline_def, PipelineSubsetDefinition):
# for the case when pipeline_def is created by IPipeline or ExternalPipeline
check.invariant(
solids_to_execute == pipeline_def.solids_to_execute,
"Cannot create a PipelineRun from pipeline subset {pipeline_solids_to_execute} "
"that conflicts with solids_to_execute arg {solids_to_execute}".format(
pipeline_solids_to_execute=str_format_list(pipeline_def.solids_to_execute),
solids_to_execute=str_format_list(solids_to_execute),
),
)
else:
# for cases when `create_run_for_pipeline` is directly called
pipeline_def = pipeline_def.get_pipeline_subset_def(
solids_to_execute=solids_to_execute
)
if asset_selection and isinstance(pipeline_def, JobDefinition):
# for cases when `create_run_for_pipeline` is directly called
pipeline_def = pipeline_def.get_job_def_for_subset_selection(
asset_selection=asset_selection
)
step_keys_to_execute = None
if execution_plan:
step_keys_to_execute = execution_plan.step_keys_to_execute
else:
execution_plan = create_execution_plan(
pipeline=InMemoryPipeline(pipeline_def),
run_config=run_config,
mode=mode,
instance_ref=self.get_ref() if self.is_persistent else None,
tags=tags,
repository_load_data=repository_load_data,
)
return self.create_run(
pipeline_name=pipeline_def.name,
run_id=run_id,
run_config=run_config,
mode=check.opt_str_param(mode, "mode", default=pipeline_def.get_default_mode_name()),
solid_selection=solid_selection,
asset_selection=asset_selection,
solids_to_execute=solids_to_execute,
step_keys_to_execute=step_keys_to_execute,
status=status,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
pipeline_snapshot=pipeline_def.get_pipeline_snapshot(),
execution_plan_snapshot=snapshot_from_execution_plan(
execution_plan,
pipeline_def.get_pipeline_snapshot_id(),
),
parent_pipeline_snapshot=pipeline_def.get_parent_pipeline_snapshot(),
external_pipeline_origin=external_pipeline_origin,
pipeline_code_origin=pipeline_code_origin,
)
def _construct_run_with_snapshots(
self,
pipeline_name,
run_id,
run_config,
mode,
solids_to_execute,
step_keys_to_execute,
status,
tags,
root_run_id,
parent_run_id,
pipeline_snapshot,
execution_plan_snapshot,
parent_pipeline_snapshot,
asset_selection=None,
solid_selection=None,
external_pipeline_origin=None,
pipeline_code_origin=None,
) -> DagsterRun:
# https://github.com/dagster-io/dagster/issues/2403
if tags and IS_AIRFLOW_INGEST_PIPELINE_STR in tags:
if AIRFLOW_EXECUTION_DATE_STR not in tags:
tags[AIRFLOW_EXECUTION_DATE_STR] = get_current_datetime_in_utc().isoformat()
check.invariant(
not (not pipeline_snapshot and execution_plan_snapshot),
"It is illegal to have an execution plan snapshot and not have a pipeline snapshot. "
"It is possible to have no execution plan snapshot since we persist runs "
"that do not successfully compile execution plans in the scheduled case.",
)
pipeline_snapshot_id = (
self._ensure_persisted_pipeline_snapshot(pipeline_snapshot, parent_pipeline_snapshot)
if pipeline_snapshot
else None
)
execution_plan_snapshot_id = (
self._ensure_persisted_execution_plan_snapshot(
execution_plan_snapshot, pipeline_snapshot_id, step_keys_to_execute
)
if execution_plan_snapshot and pipeline_snapshot_id
else None
)
return DagsterRun(
pipeline_name=pipeline_name,
run_id=run_id,
run_config=run_config,
mode=mode,
asset_selection=asset_selection,
solid_selection=solid_selection,
solids_to_execute=solids_to_execute,
step_keys_to_execute=step_keys_to_execute,
status=status,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
pipeline_snapshot_id=pipeline_snapshot_id,
execution_plan_snapshot_id=execution_plan_snapshot_id,
external_pipeline_origin=external_pipeline_origin,
pipeline_code_origin=pipeline_code_origin,
has_repository_load_data=execution_plan_snapshot is not None
and execution_plan_snapshot.repository_load_data is not None,
)
def _ensure_persisted_pipeline_snapshot(self, pipeline_snapshot, parent_pipeline_snapshot):
from dagster._core.snap import PipelineSnapshot, create_pipeline_snapshot_id
check.inst_param(pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot)
check.opt_inst_param(parent_pipeline_snapshot, "parent_pipeline_snapshot", PipelineSnapshot)
if pipeline_snapshot.lineage_snapshot:
if not self._run_storage.has_pipeline_snapshot(
pipeline_snapshot.lineage_snapshot.parent_snapshot_id
):
check.invariant(
create_pipeline_snapshot_id(parent_pipeline_snapshot)
== pipeline_snapshot.lineage_snapshot.parent_snapshot_id,
"Parent pipeline snapshot id out of sync with passed parent pipeline snapshot",
)
returned_pipeline_snapshot_id = self._run_storage.add_pipeline_snapshot(
parent_pipeline_snapshot
)
check.invariant(
pipeline_snapshot.lineage_snapshot.parent_snapshot_id
== returned_pipeline_snapshot_id
)
pipeline_snapshot_id = create_pipeline_snapshot_id(pipeline_snapshot)
if not self._run_storage.has_pipeline_snapshot(pipeline_snapshot_id):
returned_pipeline_snapshot_id = self._run_storage.add_pipeline_snapshot(
pipeline_snapshot
)
check.invariant(pipeline_snapshot_id == returned_pipeline_snapshot_id)
return pipeline_snapshot_id
def _ensure_persisted_execution_plan_snapshot(
self, execution_plan_snapshot, pipeline_snapshot_id, step_keys_to_execute
):
from dagster._core.snap.execution_plan_snapshot import (
ExecutionPlanSnapshot,
create_execution_plan_snapshot_id,
)
check.inst_param(execution_plan_snapshot, "execution_plan_snapshot", ExecutionPlanSnapshot)
check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id")
check.opt_nullable_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str)
check.invariant(
execution_plan_snapshot.pipeline_snapshot_id == pipeline_snapshot_id,
(
"Snapshot mismatch: Snapshot ID in execution plan snapshot is "
'"{ep_pipeline_snapshot_id}" and snapshot_id created in memory is '
'"{pipeline_snapshot_id}"'
).format(
ep_pipeline_snapshot_id=execution_plan_snapshot.pipeline_snapshot_id,
pipeline_snapshot_id=pipeline_snapshot_id,
),
)
execution_plan_snapshot_id = create_execution_plan_snapshot_id(execution_plan_snapshot)
if not self._run_storage.has_execution_plan_snapshot(execution_plan_snapshot_id):
returned_execution_plan_snapshot_id = self._run_storage.add_execution_plan_snapshot(
execution_plan_snapshot
)
check.invariant(execution_plan_snapshot_id == returned_execution_plan_snapshot_id)
return execution_plan_snapshot_id
def _log_asset_materialization_planned_events(self, pipeline_run, execution_plan_snapshot):
from dagster._core.events import (
AssetMaterializationPlannedData,
DagsterEvent,
DagsterEventType,
)
pipeline_name = pipeline_run.pipeline_name
for step in execution_plan_snapshot.steps:
if step.key in execution_plan_snapshot.step_keys_to_execute:
for output in step.outputs:
asset_key = output.properties.asset_key
if asset_key:
# Logs and stores asset_materialization_planned event
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
pipeline_name=pipeline_name,
message=f"{pipeline_name} intends to materialize asset {asset_key.to_string()}",
event_specific_data=AssetMaterializationPlannedData(asset_key),
)
self.report_dagster_event(event, pipeline_run.run_id, logging.DEBUG)
def create_run(
self,
pipeline_name,
run_id,
run_config,
mode,
solids_to_execute,
step_keys_to_execute,
status,
tags,
root_run_id,
parent_run_id,
pipeline_snapshot,
execution_plan_snapshot,
parent_pipeline_snapshot,
asset_selection=None,
solid_selection=None,
external_pipeline_origin=None,
pipeline_code_origin=None,
) -> PipelineRun:
pipeline_run = self._construct_run_with_snapshots(
pipeline_name=pipeline_name,
run_id=run_id,
run_config=run_config,
mode=mode,
asset_selection=asset_selection,
solid_selection=solid_selection,
solids_to_execute=solids_to_execute,
step_keys_to_execute=step_keys_to_execute,
status=status,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
pipeline_snapshot=pipeline_snapshot,
execution_plan_snapshot=execution_plan_snapshot,
parent_pipeline_snapshot=parent_pipeline_snapshot,
external_pipeline_origin=external_pipeline_origin,
pipeline_code_origin=pipeline_code_origin,
)
pipeline_run = self._run_storage.add_run(pipeline_run)
if execution_plan_snapshot:
self._log_asset_materialization_planned_events(pipeline_run, execution_plan_snapshot)
return pipeline_run
def create_reexecuted_run(
self,
parent_run: DagsterRun,
repo_location: "RepositoryLocation",
external_pipeline: "ExternalPipeline",
strategy: "ReexecutionStrategy",
extra_tags: Optional[Mapping[str, Any]] = None,
run_config: Optional[Mapping[str, Any]] = None,
mode: Optional[str] = None,
use_parent_run_tags: bool = False,
) -> PipelineRun:
from dagster._core.execution.plan.resume_retry import (
ReexecutionStrategy,
get_retry_steps_from_parent_run,
)
from dagster._core.host_representation import ExternalPipeline, RepositoryLocation
check.inst_param(parent_run, "parent_run", DagsterRun)
check.inst_param(repo_location, "repo_location", RepositoryLocation)
check.inst_param(external_pipeline, "external_pipeline", ExternalPipeline)
check.inst_param(strategy, "strategy", ReexecutionStrategy)
check.opt_mapping_param(extra_tags, "extra_tags", key_type=str)
check.opt_mapping_param(run_config, "run_config", key_type=str)
check.opt_str_param(mode, "mode")
check.bool_param(use_parent_run_tags, "use_parent_run_tags")
root_run_id = parent_run.root_run_id or parent_run.run_id
parent_run_id = parent_run.run_id
tags = merge_dicts(
external_pipeline.tags,
# these can differ from external_pipeline.tags if tags were added at launch time
parent_run.tags if use_parent_run_tags else {},
extra_tags or {},
{
PARENT_RUN_ID_TAG: parent_run_id,
ROOT_RUN_ID_TAG: root_run_id,
},
)
mode = cast(str, mode if mode is not None else parent_run.mode)
run_config = run_config if run_config is not None else parent_run.run_config
if strategy == ReexecutionStrategy.FROM_FAILURE:
check.invariant(
parent_run.status == PipelineRunStatus.FAILURE,
"Cannot reexecute from failure a run that is not failed",
)
step_keys_to_execute, known_state = get_retry_steps_from_parent_run(
self,
parent_run=parent_run,
)
tags[RESUME_RETRY_TAG] = "true"
elif strategy == ReexecutionStrategy.ALL_STEPS:
step_keys_to_execute = None
known_state = None
else:
raise DagsterInvariantViolationError(f"Unknown reexecution strategy: {strategy}")
external_execution_plan = repo_location.get_external_execution_plan(
external_pipeline,
run_config,
mode=mode,
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
instance=self,
)
return self.create_run(
pipeline_name=parent_run.pipeline_name,
run_id=None,
run_config=run_config,
mode=mode,
solids_to_execute=parent_run.solids_to_execute,
step_keys_to_execute=step_keys_to_execute,
status=PipelineRunStatus.NOT_STARTED,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
pipeline_snapshot=external_pipeline.pipeline_snapshot,
execution_plan_snapshot=external_execution_plan.execution_plan_snapshot,
parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot,
solid_selection=parent_run.solid_selection,
asset_selection=parent_run.asset_selection,
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)
def register_managed_run(
self,
pipeline_name,
run_id,
run_config,
mode,
solids_to_execute,
step_keys_to_execute,
tags,
root_run_id,
parent_run_id,
pipeline_snapshot,
execution_plan_snapshot,
parent_pipeline_snapshot,
solid_selection=None,
pipeline_code_origin=None,
):
# The usage of this method is limited to dagster-airflow, specifically in Dagster
# Operators that are executed in Airflow. Because a common workflow in Airflow is to
# retry dags from arbitrary tasks, we need any node to be capable of creating a
# PipelineRun.
#
# The try-except DagsterRunAlreadyExists block handles the race when multiple "root" tasks
# simultaneously execute self._run_storage.add_run(pipeline_run). When this happens, only
# one task succeeds in creating the run, while the others get DagsterRunAlreadyExists
# error; at this point, the failed tasks try again to fetch the existing run.
# https://github.com/dagster-io/dagster/issues/2412
pipeline_run = self._construct_run_with_snapshots(
pipeline_name=pipeline_name,
run_id=run_id,
run_config=run_config,
mode=mode,
solid_selection=solid_selection,
solids_to_execute=solids_to_execute,
step_keys_to_execute=step_keys_to_execute,
status=PipelineRunStatus.MANAGED,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
pipeline_snapshot=pipeline_snapshot,
execution_plan_snapshot=execution_plan_snapshot,
parent_pipeline_snapshot=parent_pipeline_snapshot,
pipeline_code_origin=pipeline_code_origin,
)
def get_run():
candidate_run = self.get_run_by_id(pipeline_run.run_id)
field_diff = _check_run_equality(pipeline_run, candidate_run)
if field_diff:
raise DagsterRunConflict(
"Found conflicting existing run with same id {run_id}. Runs differ in:"
"\n{field_diff}".format(
run_id=pipeline_run.run_id,
field_diff=_format_field_diff(field_diff),
),
)
return candidate_run
if self.has_run(pipeline_run.run_id):
return get_run()
try:
return self._run_storage.add_run(pipeline_run)
except DagsterRunAlreadyExists:
return get_run()
@traced
def add_run(self, pipeline_run: PipelineRun) -> PipelineRun:
return self._run_storage.add_run(pipeline_run)
@traced
def add_snapshot(self, snapshot, snapshot_id=None):
return self._run_storage.add_snapshot(snapshot, snapshot_id)
@traced
def handle_run_event(self, run_id: str, event: "DagsterEvent"):
return self._run_storage.handle_run_event(run_id, event)
@traced
def add_run_tags(self, run_id: str, new_tags: Mapping[str, str]):
return self._run_storage.add_run_tags(run_id, new_tags)
@traced
def has_run(self, run_id: str) -> bool:
return self._run_storage.has_run(run_id)
@traced
def get_runs(
self,
filters: Optional[RunsFilter] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
bucket_by: Optional[Union[JobBucket, TagBucket]] = None,
) -> Iterable[PipelineRun]:
return self._run_storage.get_runs(filters, cursor, limit, bucket_by)
@traced
def get_runs_count(self, filters: Optional[RunsFilter] = None) -> int:
return self._run_storage.get_runs_count(filters)
@traced
def get_run_groups(
self,
filters: Optional[RunsFilter] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
) -> Mapping[str, Mapping[str, Union[Iterable[PipelineRun], int]]]:
return self._run_storage.get_run_groups(filters=filters, cursor=cursor, limit=limit)
@public
@traced
def get_run_records(
self,
filters: Optional[RunsFilter] = None,
limit: Optional[int] = None,
order_by: Optional[str] = None,
ascending: bool = False,
cursor: Optional[str] = None,
bucket_by: Optional[Union[JobBucket, TagBucket]] = None,
) -> Sequence[RunRecord]:
"""Return a list of run records stored in the run storage, sorted by the given column in given order.
Args:
filters (Optional[RunsFilter]): the filter by which to filter runs.
limit (Optional[int]): Number of results to get. Defaults to infinite.
order_by (Optional[str]): Name of the column to sort by. Defaults to id.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
List[RunRecord]: List of run records stored in the run storage.
"""
return self._run_storage.get_run_records(
filters, limit, order_by, ascending, cursor, bucket_by
)
@property
def supports_bucket_queries(self):
return self._run_storage.supports_bucket_queries
@traced
def get_run_partition_data(self, runs_filter: RunsFilter) -> Sequence[RunPartitionData]:
"""Get run partition data for a given partitioned job."""
return self._run_storage.get_run_partition_data(runs_filter)
def wipe(self):
self._run_storage.wipe()
self._event_storage.wipe()
@public
@traced
def delete_run(self, run_id: str):
self._run_storage.delete_run(run_id)
self._event_storage.delete_events(run_id)
# event storage
@traced
def logs_after(
self,
run_id,
cursor,
of_type: Optional["DagsterEventType"] = None,
limit: Optional[int] = None,
):
return self._event_storage.get_logs_for_run(
run_id,
cursor=cursor,
of_type=of_type,
limit=limit,
)
@traced
def all_logs(
self, run_id, of_type: Optional[Union["DagsterEventType", Set["DagsterEventType"]]] = None
):
return self._event_storage.get_logs_for_run(run_id, of_type=of_type)
@traced
def get_records_for_run(
self,
run_id: str,
cursor: Optional[str] = None,
of_type: Optional[Union["DagsterEventType", Set["DagsterEventType"]]] = None,
limit: Optional[int] = None,
):
return self._event_storage.get_records_for_run(run_id, cursor, of_type, limit)
def watch_event_logs(self, run_id, cursor, cb):
return self._event_storage.watch(run_id, cursor, cb)
def end_watch_event_logs(self, run_id, cb):
return self._event_storage.end_watch(run_id, cb)
# asset storage
@traced
def all_asset_keys(self):
return self._event_storage.all_asset_keys()
@public
@traced
def get_asset_keys(self, prefix=None, limit=None, cursor=None):
return self._event_storage.get_asset_keys(prefix=prefix, limit=limit, cursor=cursor)
@public
@traced
def has_asset_key(self, asset_key: AssetKey) -> bool:
return self._event_storage.has_asset_key(asset_key)
@traced
def get_latest_materialization_events(
self, asset_keys: Sequence[AssetKey]
) -> Mapping[AssetKey, Optional["EventLogEntry"]]:
return self._event_storage.get_latest_materialization_events(asset_keys)
@public
@traced
def get_latest_materialization_event(self, asset_key: AssetKey) -> Optional["EventLogEntry"]:
return self._event_storage.get_latest_materialization_events([asset_key]).get(asset_key)
@public
@traced
def get_event_records(
self,
event_records_filter: "EventRecordsFilter",
limit: Optional[int] = None,
ascending: bool = False,
) -> Iterable["EventLogRecord"]:
"""Return a list of event records stored in the event log storage.
Args:
event_records_filter (Optional[EventRecordsFilter]): the filter by which to filter event
records.
limit (Optional[int]): Number of results to get. Defaults to infinite.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
List[EventLogRecord]: List of event log records stored in the event log storage.
"""
return self._event_storage.get_event_records(event_records_filter, limit, ascending)
@public
@traced
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Iterable["AssetRecord"]:
return self._event_storage.get_asset_records(asset_keys)
@traced
def get_event_tags_for_asset(
self, asset_key: AssetKey, filter_tags: Optional[Mapping[str, str]] = None
) -> Sequence[Mapping[str, str]]:
"""
Fetches asset event tags for the given asset key.
If filter_tags is provided, searches for events containing all of the filter tags. Then,
returns all tags for those events. This enables searching for multipartitioned asset
partition tags with a fixed dimension value, e.g. all of the tags for events where
"country" == "US".
Returns a list of dicts, where each dict is a mapping of tag key to tag value for a
single event.
"""
return self._event_storage.get_event_tags_for_asset(asset_key, filter_tags)
@traced
def run_ids_for_asset_key(self, asset_key):
check.inst_param(asset_key, "asset_key", AssetKey)
return self._event_storage.get_asset_run_ids(asset_key)
@public
@traced
def wipe_assets(self, asset_keys):
check.list_param(asset_keys, "asset_keys", of_type=AssetKey)
for asset_key in asset_keys:
self._event_storage.wipe_asset(asset_key)
@traced
def get_materialization_count_by_partition(
self, asset_keys: Sequence[AssetKey]
) -> Mapping[AssetKey, Mapping[str, int]]:
return self._event_storage.get_materialization_count_by_partition(asset_keys)
# event subscriptions
def _get_yaml_python_handlers(self):
if self._settings:
logging_config = self.get_settings("python_logs").get("dagster_handler_config", {})
if logging_config:
experimental_functionality_warning("Handling yaml-defined logging configuration")
# Handlers can only be retrieved from dictConfig configuration if they are attached
# to a logger. We add a dummy logger to the configuration that allows us to access user
# defined handlers.
handler_names = logging_config.get("handlers", {}).keys()
dagster_dummy_logger_name = "dagster_dummy_logger"
processed_dict_conf = {
"version": 1,
"disable_existing_loggers": False,
"loggers": {dagster_dummy_logger_name: {"handlers": handler_names}},
}
processed_dict_conf.update(logging_config)
logging.config.dictConfig(processed_dict_conf)
dummy_logger = logging.getLogger(dagster_dummy_logger_name)
return dummy_logger.handlers
return []
def _get_event_log_handler(self):
event_log_handler = _EventListenerLogHandler(self)
event_log_handler.setLevel(10)
return event_log_handler
def get_handlers(self):
handlers = [self._get_event_log_handler()]
handlers.extend(self._get_yaml_python_handlers())
return handlers
def store_event(self, event):
self._event_storage.store_event(event)
def handle_new_event(self, event):
run_id = event.run_id
self._event_storage.store_event(event)
if event.is_dagster_event and event.dagster_event.is_pipeline_event:
self._run_storage.handle_run_event(run_id, event.dagster_event)
for sub in self._subscribers[run_id]:
sub(event)
def add_event_listener(self, run_id, cb):
self._subscribers[run_id].append(cb)
def report_engine_event(
self,
message,
pipeline_run=None,
engine_event_data=None,
cls=None,
step_key=None,
pipeline_name=None,
run_id=None,
):
"""
Report a EngineEvent that occurred outside of a pipeline execution context.
"""
from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData
check.opt_class_param(cls, "cls")
check.str_param(message, "message")
check.opt_inst_param(pipeline_run, "pipeline_run", PipelineRun)
check.opt_str_param(run_id, "run_id")
check.opt_str_param(pipeline_name, "pipeline_name")
check.invariant(
pipeline_run or (pipeline_name and run_id),
"Must include either pipeline_run or pipeline_name and run_id",
)
run_id = run_id if run_id else pipeline_run.run_id
pipeline_name = pipeline_name if pipeline_name else pipeline_run.pipeline_name
engine_event_data = check.opt_inst_param(
engine_event_data,
"engine_event_data",
EngineEventData,
EngineEventData([]),
)
if cls:
message = "[{}] {}".format(cls.__name__, message)
log_level = logging.INFO
if engine_event_data and engine_event_data.error:
log_level = logging.ERROR
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.ENGINE_EVENT.value,
pipeline_name=pipeline_name,
message=message,
event_specific_data=engine_event_data,
step_key=step_key,
)
self.report_dagster_event(dagster_event, run_id=run_id, log_level=log_level)
return dagster_event
def report_dagster_event(
self,
dagster_event: "DagsterEvent",
run_id: str,
log_level: Union[str, int] = logging.INFO,
):
"""
Takes a DagsterEvent and stores it in persistent storage for the corresponding PipelineRun
"""
from dagster._core.events.log import EventLogEntry
event_record = EventLogEntry(
user_message="",
level=log_level,
pipeline_name=dagster_event.pipeline_name,
run_id=run_id,
error_info=None,
timestamp=time.time(),
step_key=dagster_event.step_key,
dagster_event=dagster_event,
)
self.handle_new_event(event_record)
def report_run_canceling(self, run, message=None):
from dagster._core.events import DagsterEvent, DagsterEventType
check.inst_param(run, "run", PipelineRun)
message = check.opt_str_param(
message,
"message",
"Sending run termination request.",
)
canceling_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_CANCELING.value,
pipeline_name=run.pipeline_name,
message=message,
)
self.report_dagster_event(canceling_event, run_id=run.run_id)
def report_run_canceled(
self,
pipeline_run,
message=None,
):
from dagster._core.events import DagsterEvent, DagsterEventType
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
message = check.opt_str_param(
message,
"mesage",
"This run has been marked as canceled from outside the execution context.",
)
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_CANCELED.value,
pipeline_name=pipeline_run.pipeline_name,
message=message,
)
self.report_dagster_event(
dagster_event, run_id=pipeline_run.run_id, log_level=logging.ERROR
)
return dagster_event
def report_run_failed(self, pipeline_run, message=None):
from dagster._core.events import DagsterEvent, DagsterEventType
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
message = check.opt_str_param(
message,
"message",
"This run has been marked as failed from outside the execution context.",
)
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_FAILURE.value,
pipeline_name=pipeline_run.pipeline_name,
message=message,
)
self.report_dagster_event(
dagster_event, run_id=pipeline_run.run_id, log_level=logging.ERROR
)
return dagster_event
# directories
def file_manager_directory(self, run_id):
return self._local_artifact_storage.file_manager_dir(run_id)
def storage_directory(self):
return self._local_artifact_storage.storage_dir
def schedules_directory(self):
return self._local_artifact_storage.schedules_dir
# Runs coordinator
def submit_run(self, run_id, workspace: "IWorkspace") -> PipelineRun:
"""Submit a pipeline run to the coordinator.
This method delegates to the ``RunCoordinator``, configured on the instance, and will
call its implementation of ``RunCoordinator.submit_run()`` to send the run to the
coordinator for execution. Runs should be created in the instance (e.g., by calling
``DagsterInstance.create_run()``) *before* this method is called, and
should be in the ``PipelineRunStatus.NOT_STARTED`` state. They also must have a non-null
ExternalPipelineOrigin.
Args:
run_id (str): The id of the run.
"""
from dagster._core.host_representation import ExternalPipelineOrigin
from dagster._core.origin import PipelinePythonOrigin
from dagster._core.run_coordinator import SubmitRunContext
run = self.get_run_by_id(run_id)
if run is None:
raise DagsterInvariantViolationError(
f"Could not load run {run_id} that was passed to submit_run"
)
check.inst(
run.external_pipeline_origin,
ExternalPipelineOrigin,
"External pipeline origin must be set for submitted runs",
)
check.inst(
run.pipeline_code_origin,
PipelinePythonOrigin,
"Python origin must be set for submitted runs",
)
try:
submitted_run = self._run_coordinator.submit_run(
SubmitRunContext(run, workspace=workspace)
)
except:
from dagster._core.events import EngineEventData
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
error.message,
run,
EngineEventData.engine_error(error),
)
self.report_run_failed(run)
raise
return submitted_run
# Run launcher
def launch_run(self, run_id: str, workspace: "IWorkspace"):
"""Launch a pipeline run.
This method is typically called using `instance.submit_run` rather than being invoked
directly. This method delegates to the ``RunLauncher``, if any, configured on the instance,
and will call its implementation of ``RunLauncher.launch_run()`` to begin the execution of
the specified run. Runs should be created in the instance (e.g., by calling
``DagsterInstance.create_run()``) *before* this method is called, and should be in the
``PipelineRunStatus.NOT_STARTED`` state.
Args:
run_id (str): The id of the run the launch.
"""
from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData
from dagster._core.launcher import LaunchRunContext
run = self.get_run_by_id(run_id)
if run is None:
raise DagsterInvariantViolationError(
f"Could not load run {run_id} that was passed to launch_run"
)
launch_started_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_STARTING.value,
pipeline_name=run.pipeline_name,
)
self.report_dagster_event(launch_started_event, run_id=run.run_id)
run = self.get_run_by_id(run_id)
if run is None:
check.failed(f"Failed to reload run {run_id}")
try:
self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace))
except:
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
error.message,
run,
EngineEventData.engine_error(error),
)
self.report_run_failed(run)
raise
return run
def resume_run(self, run_id: str, workspace: "IWorkspace", attempt_number: int):
"""Resume a pipeline run.
This method should be called on runs which have already been launched, but whose run workers
have died.
Args:
run_id (str): The id of the run the launch.
"""
from dagster._core.events import EngineEventData
from dagster._core.launcher import ResumeRunContext
from dagster._daemon.monitoring import RESUME_RUN_LOG_MESSAGE
run = self.get_run_by_id(run_id)
if run is None:
raise DagsterInvariantViolationError(
f"Could not load run {run_id} that was passed to resume_run"
)
if run.status not in IN_PROGRESS_RUN_STATUSES:
raise DagsterInvariantViolationError(
f"Run {run_id} is not in a state that can be resumed"
)
self.report_engine_event(
RESUME_RUN_LOG_MESSAGE,
run,
)
try:
self._run_launcher.resume_run(
ResumeRunContext(
pipeline_run=run,
workspace=workspace,
resume_attempt_number=attempt_number,
)
)
except:
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
error.message,
run,
EngineEventData.engine_error(error),
)
self.report_run_failed(run)
raise
return run
def count_resume_run_attempts(self, run_id: str):
from dagster._daemon.monitoring import count_resume_run_attempts
return count_resume_run_attempts(self, run_id)
def run_will_resume(self, run_id: str):
if not self.run_monitoring_enabled:
return False
return self.count_resume_run_attempts(run_id) < self.run_monitoring_max_resume_run_attempts
# Scheduler
def start_schedule(self, external_schedule):
return self._scheduler.start_schedule(self, external_schedule)
def stop_schedule(self, schedule_origin_id, schedule_selector_id, external_schedule):
return self._scheduler.stop_schedule(
self, schedule_origin_id, schedule_selector_id, external_schedule
)
def scheduler_debug_info(self):
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler import SchedulerDebugInfo
errors = []
schedules = []
for schedule_state in self.all_instigator_state(instigator_type=InstigatorType.SCHEDULE):
schedule_info = {
schedule_state.instigator_name: {
"status": schedule_state.status.value,
"cron_schedule": schedule_state.instigator_data.cron_schedule,
"schedule_origin_id": schedule_state.instigator_origin_id,
"repository_origin_id": schedule_state.repository_origin_id,
}
}
schedules.append(yaml.safe_dump(schedule_info, default_flow_style=False))
return SchedulerDebugInfo(
scheduler_config_info=self._info_str_for_component("Scheduler", self.scheduler),
scheduler_info=self.scheduler.debug_info(),
schedule_storage=schedules,
errors=errors,
)
# Schedule / Sensor Storage
def start_sensor(self, external_sensor: "ExternalSensor"):
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)
stored_state = self.get_instigator_state(
external_sensor.get_external_origin_id(), external_sensor.selector_id
)
computed_state = external_sensor.get_current_instigator_state(stored_state)
if computed_state.is_running:
return computed_state
if not stored_state:
return self.add_instigator_state(
InstigatorState(
external_sensor.get_external_origin(),
InstigatorType.SENSOR,
InstigatorStatus.RUNNING,
SensorInstigatorData(min_interval=external_sensor.min_interval_seconds),
)
)
else:
return self.update_instigator_state(stored_state.with_status(InstigatorStatus.RUNNING))
def stop_sensor(
self,
instigator_origin_id: str,
selector_id: str,
external_sensor: Optional["ExternalSensor"],
):
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)
stored_state = self.get_instigator_state(instigator_origin_id, selector_id)
computed_state: InstigatorState
if external_sensor:
computed_state = external_sensor.get_current_instigator_state(stored_state)
else:
computed_state = check.not_none(stored_state)
if not computed_state.is_running:
return computed_state
if not stored_state:
assert external_sensor
return self.add_instigator_state(
InstigatorState(
external_sensor.get_external_origin(),
InstigatorType.SENSOR,
InstigatorStatus.STOPPED,
SensorInstigatorData(min_interval=external_sensor.min_interval_seconds),
)
)
else:
return self.update_instigator_state(stored_state.with_status(InstigatorStatus.STOPPED))
@traced
def all_instigator_state(
self, repository_origin_id=None, repository_selector_id=None, instigator_type=None
):
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.all_instigator_state(
repository_origin_id, repository_selector_id, instigator_type
)
@traced
def get_instigator_state(self, origin_id: str, selector_id: str) -> Optional["InstigatorState"]:
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.get_instigator_state(origin_id, selector_id)
def add_instigator_state(self, state: "InstigatorState") -> "InstigatorState":
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.add_instigator_state(state)
def update_instigator_state(self, state: "InstigatorState") -> "InstigatorState":
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.update_instigator_state(state)
def delete_instigator_state(self, origin_id, selector_id):
return self._schedule_storage.delete_instigator_state(origin_id, selector_id)
@property
def supports_batch_tick_queries(self):
return self._schedule_storage and self._schedule_storage.supports_batch_queries
@traced
def get_batch_ticks(
self,
selector_ids: Sequence[str],
limit: Optional[int] = None,
statuses: Optional[Sequence["TickStatus"]] = None,
) -> Mapping[str, Iterable["InstigatorTick"]]:
if not self._schedule_storage:
return {}
return self._schedule_storage.get_batch_ticks(selector_ids, limit, statuses)
@traced
def get_tick(self, origin_id, selector_id, timestamp):
matches = self._schedule_storage.get_ticks(
origin_id, selector_id, before=timestamp + 1, after=timestamp - 1, limit=1
)
return matches[0] if len(matches) else None
@traced
def get_ticks(self, origin_id, selector_id, before=None, after=None, limit=None, statuses=None):
return self._schedule_storage.get_ticks(
origin_id, selector_id, before=before, after=after, limit=limit, statuses=statuses
)
def create_tick(self, tick_data: "TickData") -> "InstigatorTick":
return check.not_none(self._schedule_storage).create_tick(tick_data)
def update_tick(self, tick: "InstigatorTick"):
return check.not_none(self._schedule_storage).update_tick(tick)
def purge_ticks(self, origin_id, selector_id, before, tick_statuses=None):
self._schedule_storage.purge_ticks(origin_id, selector_id, before, tick_statuses)
def wipe_all_schedules(self):
if self._scheduler:
self._scheduler.wipe(self)
self._schedule_storage.wipe()
def logs_path_for_schedule(self, schedule_origin_id):
return self._scheduler.get_logs_path(self, schedule_origin_id)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
self.dispose()
if DagsterInstance._EXIT_STACK:
DagsterInstance._EXIT_STACK.close()
# dagster daemon
def add_daemon_heartbeat(self, daemon_heartbeat: "DaemonHeartbeat"):
"""Called on a regular interval by the daemon"""
self._run_storage.add_daemon_heartbeat(daemon_heartbeat)
def get_daemon_heartbeats(self) -> Mapping[str, "DaemonHeartbeat"]:
"""Latest heartbeats of all daemon types"""
return self._run_storage.get_daemon_heartbeats()
def wipe_daemon_heartbeats(self):
self._run_storage.wipe_daemon_heartbeats()
def get_required_daemon_types(self):
from dagster._core.run_coordinator import QueuedRunCoordinator
from dagster._core.scheduler import DagsterDaemonScheduler
from dagster._daemon.auto_run_reexecution.event_log_consumer import EventLogConsumerDaemon
from dagster._daemon.daemon import (
BackfillDaemon,
MonitoringDaemon,
SchedulerDaemon,
SensorDaemon,
)
from dagster._daemon.run_coordinator.queued_run_coordinator_daemon import (
QueuedRunCoordinatorDaemon,
)
if self.is_ephemeral:
return []
daemons = [SensorDaemon.daemon_type(), BackfillDaemon.daemon_type()]
if isinstance(self.scheduler, DagsterDaemonScheduler):
daemons.append(SchedulerDaemon.daemon_type())
if isinstance(self.run_coordinator, QueuedRunCoordinator):
daemons.append(QueuedRunCoordinatorDaemon.daemon_type())
if self.run_monitoring_enabled:
daemons.append(MonitoringDaemon.daemon_type())
if self.run_retries_enabled:
daemons.append(EventLogConsumerDaemon.daemon_type())
return daemons
def get_daemon_statuses(
self, daemon_types: Optional[Sequence[str]] = None
) -> Mapping[str, "DaemonStatus"]:
"""
Get the current status of the daemons. If daemon_types aren't provided, defaults to all
required types. Returns a dict of daemon type to status.
"""
from dagster._daemon.controller import get_daemon_statuses
check.opt_sequence_param(daemon_types, "daemon_types", of_type=str)
return get_daemon_statuses(
self, daemon_types=daemon_types or self.get_required_daemon_types(), ignore_errors=True
)
@property
def daemon_skip_heartbeats_without_errors(self):
# If enabled, daemon threads won't write heartbeats unless they encounter an error. This is
# enabled in cloud, where we don't need to use heartbeats to check if daemons are running, but
# do need to surface errors to users. This is an optimization to reduce DB writes.
return False
# backfill
def get_backfills(self, status=None, cursor=None, limit=None) -> Sequence["PartitionBackfill"]:
return self._run_storage.get_backfills(status=status, cursor=cursor, limit=limit)
def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._run_storage.get_backfill(backfill_id)
def add_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.add_backfill(partition_backfill)
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.update_backfill(partition_backfill)
@property
def should_start_background_run_thread(self) -> bool:
"""
Gate on an experimental feature to start a thread that monitors for if the run should be canceled.
"""
return False
def get_tick_retention_settings(
self, instigator_type: "InstigatorType"
) -> Mapping["TickStatus", int]:
from dagster._core.definitions.run_request import InstigatorType
retention_settings = self.get_settings("retention")
tick_settings = (
retention_settings.get("schedule")
if instigator_type == InstigatorType.SCHEDULE
else retention_settings.get("sensor")
)
default_tick_settings = get_default_tick_retention_settings(instigator_type)
return get_tick_retention_settings(tick_settings, default_tick_settings)
def inject_env_vars(self, location_name: Optional[str]):
if not self._secrets_loader:
return
new_env = self._secrets_loader.get_secrets_for_environment(location_name)
for k, v in new_env.items():
os.environ[k] = v