Source code for dagster._core.launcher.base

from abc import ABC, abstractmethod
from enum import Enum
from typing import NamedTuple, Optional

from dagster._core.instance import MayHaveInstanceWeakref
from dagster._core.origin import PipelinePythonOrigin
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.workspace.workspace import IWorkspace
from dagster._serdes import whitelist_for_serdes


class LaunchRunContext(NamedTuple):
    """
    Context available within a run launcher's launch_run call.
    """

    pipeline_run: PipelineRun
    workspace: Optional[IWorkspace]

    @property
    def pipeline_code_origin(self) -> Optional[PipelinePythonOrigin]:
        return self.pipeline_run.pipeline_code_origin


class ResumeRunContext(NamedTuple):
    """
    Context available within a run launcher's resume_run call.
    """

    pipeline_run: PipelineRun
    workspace: Optional[IWorkspace]
    resume_attempt_number: Optional[int] = None

    @property
    def pipeline_code_origin(self) -> Optional[PipelinePythonOrigin]:
        return self.pipeline_run.pipeline_code_origin


@whitelist_for_serdes
class WorkerStatus(Enum):
    RUNNING = "RUNNING"
    NOT_FOUND = "NOT_FOUND"
    FAILED = "FAILED"
    SUCCESS = "SUCCESS"
    UNKNOWN = "UNKNOWN"


class CheckRunHealthResult(NamedTuple):
    """
    Result of a check_run_worker_health call.
    """

    status: WorkerStatus
    msg: Optional[str] = None

    def __str__(self) -> str:
        return f"{self.status.value}: '{self.msg}'"


[docs]class RunLauncher(ABC, MayHaveInstanceWeakref): @abstractmethod def launch_run(self, context: LaunchRunContext) -> None: """Launch a run. This method should begin the execution of the specified run, and may emit engine events. Runs should be created in the instance (e.g., by calling ``DagsterInstance.create_run()``) *before* this method is called, and should be in the ``PipelineRunStatus.STARTING`` state. Typically, this method will not be invoked directly, but should be invoked through ``DagsterInstance.launch_run()``. Args: context (LaunchRunContext): information about the launch - every run launcher will need the PipelineRun, and some run launchers may need information from the IWorkspace from which the run was launched. """ @abstractmethod def terminate(self, run_id): """ Terminates a process. Returns False is the process was already terminated. Returns true if the process was alive and was successfully terminated """ def dispose(self): """ Do any resource cleanup that should happen when the DagsterInstance is cleaning itself up. """ def join(self, timeout=30): pass @property def supports_check_run_worker_health(self): """ Whether the run launcher supports check_run_worker_health. """ return False def check_run_worker_health(self, run: PipelineRun) -> CheckRunHealthResult: raise NotImplementedError( "This run launcher does not support run monitoring. Please disable it on your instance." ) @property def supports_resume_run(self): """ Whether the run launcher supports resume_run. """ return False def resume_run(self, context: ResumeRunContext) -> None: raise NotImplementedError( "This run launcher does not support resuming runs. If using " "run monitoring, set max_resume_run_attempts to 0." )