Utilities

dagster.file_relative_path(dunderfile, relative_path)[source]

Get a path relative to the currently executing Python file.

This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)

Parameters:
  • dunderfile (str) – Should always be __file__.

  • relative_path (str) – Path to get relative to the currently executing file.

Examples:

file_relative_path(__file__, 'path/relative/to/file')
dagster.config_from_files(config_files)[source]

Constructs run config from YAML files.

Parameters:

config_files (List[str]) – List of paths or glob patterns for yaml files to load and parse as the run config.

Returns:

A run config dictionary constructed from provided YAML files.

Return type:

Dict[str, Any]

Raises:
  • FileNotFoundError – When a config file produces no results

  • DagsterInvariantViolationError – When one of the YAML files is invalid and has a parse error.

dagster.config_from_pkg_resources(pkg_resource_defs)[source]

Load a run config from a package resource, using pkg_resources.resource_string().

Example:

config_from_pkg_resources(
    pkg_resource_defs=[
        ('dagster_examples.airline_demo.environments', 'local_base.yaml'),
        ('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'),
    ],
)
Parameters:

pkg_resource_defs (List[(str, str)]) – List of pkg_resource modules/files to load as the run config.

Returns:

A run config dictionary constructed from the provided yaml strings

Return type:

Dict[Str, Any]

Raises:

DagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.config_from_yaml_strings(yaml_strings)[source]

Static constructor for run configs from YAML strings.

Parameters:

yaml_strings (List[str]) – List of yaml strings to parse as the run config.

Returns:

A run config dictionary constructed from the provided yaml strings

Return type:

Dict[Str, Any]

Raises:

DagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.get_dagster_logger(name=None)[source]

Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log.

This can be used as a more convenient alternative to context.log in most cases. If log level is not set explicitly, defaults to DEBUG.

Parameters:

name (Optional[str]) – If supplied, will create a logger with the name “dagster.builtin.{name}”, with properties inherited from the base Dagster logger. If omitted, the returned logger will be named “dagster.builtin”.

Returns:

A logger whose output will be captured by Dagster.

Return type:

logging.Logger

Example

from dagster import get_dagster_logger, op

@op
def hello_op():
    log = get_dagster_logger()
    for i in range(5):
        # do something
        log.info(f"Did {i+1} things!")
class dagster.ExperimentalWarning[source]
dagster.make_email_on_run_failure_sensor(email_from, email_password, email_to, email_body_fn=<function _default_failure_email_body>, email_subject_fn=<function _default_failure_email_subject>, smtp_host='smtp.gmail.com', smtp_type='SSL', smtp_port=None, name=None, dagit_base_url=None, monitored_jobs=None, job_selection=None, monitor_all_repositories=False, default_status=DefaultSensorStatus.STOPPED)[source]

Create a job failure sensor that sends email via the SMTP protocol.

Parameters:
  • email_from (str) – The sender email address to send the message from.

  • email_password (str) – The password of the sender.

  • email_to (List[str]) – The receipt email addresses to send the message to.

  • email_body_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext outputs the email body you want to send. Defaults to the plain text that contains error message, job name, and run ID.

  • email_subject_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext outputs the email subject you want to send. Defaults to “Dagster Run Failed: <job_name>”.

  • smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.

  • smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.

  • smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.

  • name – (Optional[str]): The name of the sensor. Defaults to “email_on_job_failure”.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed run.

  • monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, PipelineDefinition, RepositorySelector, JobSelector]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. To monitor jobs in external repositories, use RepositorySelector and JobSelector.

  • monitor_all_repositories (bool) – If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.

  • job_selection (Optional[List[Union[JobDefinition, GraphDefinition, PipelineDefinition, RepositorySelector, JobSelector]]]) – (deprecated in favor of monitored_jobs) The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.

  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.

Examples

email_on_run_failure = make_email_on_run_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
)

@repository
def my_repo():
    return [my_job + email_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
    return (
        f"Job {context.pipeline_run.pipeline_name} failed!"
        f"Error: {context.failure_event.message}"
    )

email_on_run_failure = make_email_on_run_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
    email_body_fn=my_message_fn,
    email_subject_fn=lambda _: "Dagster Alert",
    dagit_base_url="http://mycoolsite.com",
)
class dagster._utils.forked_pdb.ForkedPdb(completekey='tab', stdin=None, stdout=None, skip=None, nosigint=False, readrc=True)[source]

A pdb subclass that may be used from a forked multiprocessing child

Examples:

from dagster._utils.forked_pdb import ForkedPdb

@solid
def complex_solid(_):
    # some complicated stuff

    ForkedPdb().set_trace()

    # some other complicated stuff

You can initiate pipeline execution via dagit and use the pdb debugger to examine/step through execution at the breakpoint.