Source code for dagstermill.asset_factory

import os
import sys
import tempfile
import uuid
from typing import Any, Mapping, Optional, Set, Union

import papermill
from dagstermill.compat import ExecutionError
from dagstermill.factory import get_papermill_parameters, replace_parameters
from papermill.engines import papermill_engines
from papermill.iorw import load_notebook_node, write_ipynb

import dagster._check as check
from dagster import AssetIn, AssetKey, Output, PartitionsDefinition, ResourceDefinition, asset
from dagster._core.definitions.events import CoercibleToAssetKeyPrefix
from dagster._core.definitions.utils import validate_tags
from dagster._core.execution.context.compute import SolidExecutionContext
from dagster._core.execution.context.system import StepExecutionContext
from dagster._utils import safe_tempfile_path
from dagster._utils.error import serializable_error_info_from_exc_info

from .engine import DagstermillEngine


def _dm_compute(
    name,
    notebook_path,
):
    check.str_param(name, "name")
    check.str_param(notebook_path, "notebook_path")

    def _t_fn(context, **inputs):
        check.inst_param(context, "context", SolidExecutionContext)
        check.param_invariant(
            isinstance(context.run_config, dict),
            "context",
            "StepExecutionContext must have valid run_config",
        )

        step_execution_context: StepExecutionContext = context.get_step_execution_context()

        with tempfile.TemporaryDirectory() as output_notebook_dir:
            with safe_tempfile_path() as output_log_path:

                prefix = str(uuid.uuid4())
                parameterized_notebook_path = os.path.join(
                    output_notebook_dir, f"{prefix}-inter.ipynb"
                )

                executed_notebook_path = os.path.join(output_notebook_dir, f"{prefix}-out.ipynb")

                # Scaffold the registration here
                nb = load_notebook_node(notebook_path)
                compute_descriptor = "asset"

                nb_no_parameters = replace_parameters(
                    step_execution_context,
                    nb,
                    get_papermill_parameters(
                        step_execution_context,
                        inputs,
                        output_log_path,
                        compute_descriptor,
                    ),
                )

                write_ipynb(nb_no_parameters, parameterized_notebook_path)

                try:
                    papermill_engines.register("dagstermill", DagstermillEngine)
                    papermill.execute_notebook(
                        input_path=parameterized_notebook_path,
                        output_path=executed_notebook_path,
                        engine_name="dagstermill",
                        log_output=True,
                    )
                except Exception as ex:
                    step_execution_context.log.warn(
                        f"Error when attempting to materialize executed notebook: {serializable_error_info_from_exc_info(sys.exc_info())}"  # noqa: E501
                    )
                    # pylint: disable=no-member
                    # compat:
                    if isinstance(ex, ExecutionError) and (
                        ex.ename == "RetryRequested" or ex.ename == "Failure"
                    ):
                        step_execution_context.log.warn(
                            f"Encountered raised {ex.ename} in notebook. Use dagstermill.yield_event "
                            "with RetryRequested or Failure to trigger their behavior."
                        )
                    raise

            step_execution_context.log.debug(
                f"Notebook execution complete for {name} at {executed_notebook_path}."
            )
            with open(executed_notebook_path, "rb") as fd:
                return Output(fd.read())

    return _t_fn


[docs]def define_dagstermill_asset( name: str, notebook_path: str, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ins: Optional[Mapping[str, AssetIn]] = None, non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None, metadata: Optional[Mapping[str, Any]] = None, config_schema: Optional[Union[Any, Mapping[str, Any]]] = None, required_resource_keys: Optional[Set[str]] = None, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, description: Optional[str] = None, partitions_def: Optional[PartitionsDefinition] = None, op_tags: Optional[Mapping[str, Any]] = None, group_name: Optional[str] = None, io_manager_key: Optional[str] = None, ): """Creates a Dagster asset for a Jupyter notebook. Arguments: name (str): The name for the asset notebook_path (str): Path to the backing notebook key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the concatenation of the key_prefix and the asset's name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords. ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information about the input. non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are upstream dependencies, but do not pass an input to the asset. config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op. metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset. required_resource_keys (Optional[Set[str]]): Set of resource handles required by the notebook. description (Optional[str]): Description of the asset to display in Dagit. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the asset. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided, the name "default" is used. resource_defs (Optional[Mapping[str, ResourceDefinition]]): (Experimental) A mapping of resource keys to resource definitions. These resources will be initialized during execution, and can be accessed from the context within the notebook. io_manager_key (Optional[str]): A string key for the IO manager used to store the output notebook. If not provided, the default key output_notebook_io_manager will be used. Examples: .. code-block:: python from dagstermill import define_dagstermill_asset from dagster import asset, AssetIn, AssetKey from sklearn import datasets import pandas as pd import numpy as np @asset def iris_dataset(): sk_iris = datasets.load_iris() return pd.DataFrame( data=np.c_[sk_iris["data"], sk_iris["target"]], columns=sk_iris["feature_names"] + ["target"], ) iris_kmeans_notebook = define_dagstermill_asset( name="iris_kmeans_notebook", notebook_path="/path/to/iris_kmeans.ipynb", ins={ "iris": AssetIn(key=AssetKey("iris_dataset")) } ) """ check.str_param(name, "name") check.str_param(notebook_path, "notebook_path") required_resource_keys = set( check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str) ) ins = check.opt_mapping_param(ins, "ins", key_type=str, value_type=AssetIn) if isinstance(key_prefix, str): key_prefix = [key_prefix] key_prefix = check.opt_list_param(key_prefix, "key_prefix", of_type=str) default_description = f"This asset is backed by the notebook at {notebook_path}" description = check.opt_str_param(description, "description", default=default_description) io_mgr_key = check.opt_str_param( io_manager_key, "io_manager_key", default="output_notebook_io_manager" ) user_tags = validate_tags(op_tags) if op_tags is not None: check.invariant( "notebook_path" not in op_tags, "user-defined op tags contains the `notebook_path` key, but the `notebook_path` key is reserved for use by Dagster", # noqa: E501 ) check.invariant( "kind" not in op_tags, "user-defined op tags contains the `kind` key, but the `kind` key is reserved for use by Dagster", ) default_tags = {"notebook_path": notebook_path, "kind": "ipynb"} return asset( name=name, key_prefix=key_prefix, ins=ins, non_argument_deps=non_argument_deps, metadata=metadata, description=description, config_schema=config_schema, required_resource_keys=required_resource_keys, resource_defs=resource_defs, partitions_def=partitions_def, op_tags={**user_tags, **default_tags}, group_name=group_name, output_required=False, io_manager_key=io_mgr_key, )( _dm_compute( name=name, notebook_path=notebook_path, ) )