import copy
import os
import pickle
import sys
import tempfile
import uuid
from typing import Any, Mapping, Optional, Sequence, Set, Union, cast
import nbformat
import papermill
from papermill.engines import papermill_engines
from papermill.iorw import load_notebook_node, write_ipynb
from dagster import In, OpDefinition, Out, Output
from dagster import _check as check
from dagster import _seven
from dagster._core.definitions.events import AssetMaterialization, Failure, RetryRequested
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.definitions.reconstruct import ReconstructablePipeline
from dagster._core.definitions.utils import validate_tags
from dagster._core.execution.context.compute import SolidExecutionContext
from dagster._core.execution.context.input import build_input_context
from dagster._core.execution.context.system import StepExecutionContext
from dagster._core.execution.plan.outputs import StepOutputHandle
from dagster._core.storage.file_manager import FileHandle
from dagster._legacy import InputDefinition, OutputDefinition, SolidDefinition
from dagster._serdes import pack_value
from dagster._seven import get_system_temp_directory
from dagster._utils import mkdir_p, safe_tempfile_path
from dagster._utils.backcompat import rename_warning
from dagster._utils.error import serializable_error_info_from_exc_info
from .compat import ExecutionError
from .engine import DagstermillEngine
from .errors import DagstermillError
from .translator import DagsterTranslator
# https://github.com/nteract/papermill/blob/17d4bbb3960c30c263bca835e48baf34322a3530/papermill/parameterize.py
def _find_first_tagged_cell_index(nb, tag):
parameters_indices = []
for idx, cell in enumerate(nb.cells):
if tag in cell.metadata.tags:
parameters_indices.append(idx)
if not parameters_indices:
return -1
return parameters_indices[0]
# This is based on papermill.parameterize.parameterize_notebook
# Typically, papermill injects the injected-parameters cell *below* the parameters cell
# but we want to *replace* the parameters cell, which is what this function does.
def replace_parameters(context, nb, parameters):
"""Assigned parameters into the appropriate place in the input notebook
Args:
nb (NotebookNode): Executable notebook object
parameters (dict): Arbitrary keyword arguments to pass to the notebook parameters.
"""
check.dict_param(parameters, "parameters")
# Copy the nb object to avoid polluting the input
nb = copy.deepcopy(nb)
# papermill method chooses translator based on kernel_name and language, but we just call the
# DagsterTranslator to generate parameter content based on the kernel_name
param_content = DagsterTranslator.codify(parameters)
newcell = nbformat.v4.new_code_cell(source=param_content)
newcell.metadata["tags"] = ["injected-parameters"]
param_cell_index = _find_first_tagged_cell_index(nb, "parameters")
injected_cell_index = _find_first_tagged_cell_index(nb, "injected-parameters")
if injected_cell_index >= 0:
# Replace the injected cell with a new version
before = nb.cells[:injected_cell_index]
after = nb.cells[injected_cell_index + 1 :]
check.int_value_param(param_cell_index, -1, "param_cell_index")
# We should have blown away the parameters cell if there is an injected-parameters cell
elif param_cell_index >= 0:
# Replace the parameter cell with the injected-parameters cell
before = nb.cells[:param_cell_index]
after = nb.cells[param_cell_index + 1 :]
else:
# Inject to the top of the notebook, presumably first cell includes dagstermill import
context.log.debug(
(
"Executing notebook with no tagged parameters cell: injecting boilerplate in first "
"cell."
)
)
before = []
after = nb.cells
nb.cells = before + [newcell] + after
nb.metadata.papermill["parameters"] = _seven.json.dumps(parameters)
return nb
def get_papermill_parameters(step_context, inputs, output_log_path, compute_descriptor):
check.inst_param(step_context, "step_context", StepExecutionContext)
check.param_invariant(
isinstance(step_context.run_config, dict),
"step_context",
"StepExecutionContext must have valid run_config",
)
check.dict_param(inputs, "inputs", key_type=str)
run_id = step_context.run_id
temp_dir = get_system_temp_directory()
marshal_dir = os.path.normpath(os.path.join(temp_dir, "dagstermill", str(run_id), "marshal"))
mkdir_p(marshal_dir)
if not isinstance(step_context.pipeline, ReconstructablePipeline):
if compute_descriptor == "solid":
raise DagstermillError(
"Can't execute a dagstermill solid from a pipeline that is not reconstructable. "
"Use the reconstructable() function if executing from python"
)
elif compute_descriptor == "asset":
raise DagstermillError(
"Can't execute a dagstermill asset that is not reconstructable. "
"Use the reconstructable() function if executing from python"
)
else:
raise DagstermillError(
"Can't execute a dagstermill op from a job that is not reconstructable. "
"Use the reconstructable() function if executing from python"
)
dm_executable_dict = step_context.pipeline.to_dict()
dm_context_dict = {
"output_log_path": output_log_path,
"marshal_dir": marshal_dir,
"run_config": step_context.run_config,
}
dm_solid_handle_kwargs = step_context.solid_handle._asdict()
dm_step_key = step_context.step.key
parameters = {}
parameters["__dm_context"] = dm_context_dict
parameters["__dm_executable_dict"] = dm_executable_dict
parameters["__dm_pipeline_run_dict"] = pack_value(step_context.pipeline_run)
parameters["__dm_solid_handle_kwargs"] = dm_solid_handle_kwargs
parameters["__dm_instance_ref_dict"] = pack_value(step_context.instance.get_ref())
parameters["__dm_step_key"] = dm_step_key
parameters["__dm_input_names"] = list(inputs.keys())
return parameters
def _dm_compute(
dagster_factory_name,
name,
notebook_path,
output_notebook_name=None,
asset_key_prefix=None,
output_notebook=None,
):
check.str_param(name, "name")
check.str_param(notebook_path, "notebook_path")
check.opt_str_param(output_notebook_name, "output_notebook_name")
check.opt_list_param(asset_key_prefix, "asset_key_prefix")
check.opt_str_param(output_notebook, "output_notebook")
def _t_fn(step_context, inputs):
check.inst_param(step_context, "step_context", SolidExecutionContext)
check.param_invariant(
isinstance(step_context.run_config, dict),
"context",
"StepExecutionContext must have valid run_config",
)
step_execution_context = step_context.get_step_execution_context()
with tempfile.TemporaryDirectory() as output_notebook_dir:
with safe_tempfile_path() as output_log_path:
prefix = str(uuid.uuid4())
parameterized_notebook_path = os.path.join(
output_notebook_dir, f"{prefix}-inter.ipynb"
)
executed_notebook_path = os.path.join(output_notebook_dir, f"{prefix}-out.ipynb")
# Scaffold the registration here
nb = load_notebook_node(notebook_path)
compute_descriptor = (
"solid" if dagster_factory_name == "define_dagstermill_solid" else "op"
)
nb_no_parameters = replace_parameters(
step_execution_context,
nb,
get_papermill_parameters(
step_execution_context,
inputs,
output_log_path,
compute_descriptor,
),
)
write_ipynb(nb_no_parameters, parameterized_notebook_path)
try:
papermill_engines.register("dagstermill", DagstermillEngine)
papermill.execute_notebook(
input_path=parameterized_notebook_path,
output_path=executed_notebook_path,
engine_name="dagstermill",
log_output=True,
)
except Exception as ex:
step_execution_context.log.warn(
"Error when attempting to materialize executed notebook: {exc}".format(
exc=str(serializable_error_info_from_exc_info(sys.exc_info()))
)
)
# pylint: disable=no-member
# compat:
if isinstance(ex, ExecutionError) and (
ex.ename == "RetryRequested" or ex.ename == "Failure"
):
step_execution_context.log.warn(
f"Encountered raised {ex.ename} in notebook. Use dagstermill.yield_event "
"with RetryRequested or Failure to trigger their behavior."
)
raise
step_execution_context.log.debug(
"Notebook execution complete for {name} at {executed_notebook_path}.".format(
name=name,
executed_notebook_path=executed_notebook_path,
)
)
if output_notebook_name is not None:
# yield output notebook binary stream as a solid output
with open(executed_notebook_path, "rb") as fd:
yield Output(fd.read(), output_notebook_name)
else:
# backcompat
executed_notebook_file_handle = None
try:
# use binary mode when when moving the file since certain file_managers such as S3
# may try to hash the contents
with open(executed_notebook_path, "rb") as fd:
executed_notebook_file_handle = step_context.resources.file_manager.write(
fd, mode="wb", ext="ipynb"
)
executed_notebook_materialization_path = (
executed_notebook_file_handle.path_desc
)
yield AssetMaterialization(
asset_key=(asset_key_prefix + [f"{name}_output_notebook"]),
description="Location of output notebook in file manager",
metadata={
"path": MetadataValue.path(executed_notebook_materialization_path),
},
)
except Exception:
# if file manager writing errors, e.g. file manager is not provided, we throw a warning
# and fall back to the previously stored temp executed notebook.
step_context.log.warning(
"Error when attempting to materialize executed notebook using file manager: "
f"{str(serializable_error_info_from_exc_info(sys.exc_info()))}"
f"\nNow falling back to local: notebook execution was temporarily materialized at {executed_notebook_path}"
"\nIf you have supplied a file manager and expect to use it for materializing the "
'notebook, please include "file_manager" in the `required_resource_keys` argument '
f"to `{dagster_factory_name}`"
)
if output_notebook is not None:
yield Output(executed_notebook_file_handle, output_notebook)
# deferred import for perf
import scrapbook
output_nb = scrapbook.read_notebook(executed_notebook_path)
for (
output_name,
_,
) in step_execution_context.solid_def.output_dict.items():
data_dict = output_nb.scraps.data_dict
if output_name in data_dict:
# read outputs that were passed out of process via io manager from `yield_result`
step_output_handle = StepOutputHandle(
step_key=step_execution_context.step.key,
output_name=output_name,
)
output_context = step_execution_context.get_output_context(step_output_handle)
io_manager = step_execution_context.get_io_manager(step_output_handle)
value = io_manager.load_input(
build_input_context(
upstream_output=output_context, dagster_type=output_context.dagster_type
)
)
yield Output(value, output_name)
for key, value in output_nb.scraps.items():
if key.startswith("event-"):
with open(value.data, "rb") as fd:
event = pickle.loads(fd.read())
if isinstance(event, (Failure, RetryRequested)):
raise event
else:
yield event
return _t_fn
def define_dagstermill_solid(
name: str,
notebook_path: str,
input_defs: Optional[Sequence[InputDefinition]] = None,
output_defs: Optional[Sequence[OutputDefinition]] = None,
config_schema: Optional[Union[Any, Mapping[str, Any]]] = None,
required_resource_keys: Optional[Set[str]] = None,
output_notebook: Optional[str] = None,
output_notebook_name: Optional[str] = None,
asset_key_prefix: Optional[Union[Sequence[str], str]] = None,
description: Optional[str] = None,
tags: Optional[Mapping[str, Any]] = None,
):
"""Wrap a Jupyter notebook in a solid.
Arguments:
name (str): The name of the solid.
notebook_path (str): Path to the backing notebook.
input_defs (Optional[List[InputDefinition]]): The solid's inputs.
output_defs (Optional[List[OutputDefinition]]): The solid's outputs. Your notebook should
call :py:func:`~dagstermill.yield_result` to yield each of these outputs.
required_resource_keys (Optional[Set[str]]): The string names of any required resources.
output_notebook (Optional[str]): If set, will be used as the name of an injected output of
type :py:class:`~dagster.FileHandle` that will point to the executed notebook (in
addition to the :py:class:`~dagster.AssetMaterialization` that is always created). This
respects the :py:class:`~dagster._core.storage.file_manager.FileManager` configured on
the pipeline resources via the "file_manager" resource key, so, e.g.,
if :py:class:`~dagster_aws.s3.s3_file_manager` is configured, the output will be a :
py:class:`~dagster_aws.s3.S3FileHandle`.
output_notebook_name: (Optional[str]): If set, will be used as the name of an injected output
of type of :py:class:`~dagster.BufferedIOBase` that is the file object of the executed
notebook (in addition to the :py:class:`~dagster.AssetMaterialization` that is always
created). It allows the downstream solids to access the executed notebook via a file
object.
asset_key_prefix (Optional[Union[List[str], str]]): If set, will be used to prefix the
asset keys for materialized notebooks.
description (Optional[str]): If set, description used for solid.
tags (Optional[Dict[str, str]]): If set, additional tags used to annotate solid.
Dagster uses the tag keys `notebook_path` and `kind`, which cannot be
overwritten by the user.
Returns:
:py:class:`~dagster.SolidDefinition`
"""
check.str_param(name, "name")
check.str_param(notebook_path, "notebook_path")
input_defs = check.opt_sequence_param(input_defs, "input_defs", of_type=InputDefinition)
output_defs = check.opt_sequence_param(output_defs, "output_defs", of_type=OutputDefinition)
required_resource_keys = set(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
)
extra_output_defs = []
if output_notebook_name is not None:
required_resource_keys.add("output_notebook_io_manager")
extra_output_defs.append(
OutputDefinition(name=output_notebook_name, io_manager_key="output_notebook_io_manager")
)
# backcompact
if output_notebook is not None:
rename_warning(
new_name="output_notebook_name",
old_name="output_notebook",
breaking_version="0.14.0",
)
required_resource_keys.add("file_manager")
extra_output_defs.append(OutputDefinition(dagster_type=FileHandle, name=output_notebook))
if isinstance(asset_key_prefix, str):
asset_key_prefix = [asset_key_prefix]
asset_key_prefix = check.opt_list_param(asset_key_prefix, "asset_key_prefix", of_type=str)
default_description = f"This solid is backed by the notebook at {notebook_path}"
description = check.opt_str_param(description, "description", default=default_description)
user_tags = validate_tags(tags)
if tags is not None:
check.invariant(
"notebook_path" not in tags,
"user-defined solid tags contains the `notebook_path` key, but the `notebook_path` key is reserved for use by Dagster",
)
check.invariant(
"kind" not in tags,
"user-defined solid tags contains the `kind` key, but the `kind` key is reserved for use by Dagster",
)
default_tags = {"notebook_path": notebook_path, "kind": "ipynb"}
return SolidDefinition(
name=name,
input_defs=input_defs,
compute_fn=_dm_compute(
"define_dagstermill_solid",
name,
notebook_path,
output_notebook_name,
asset_key_prefix=asset_key_prefix,
output_notebook=output_notebook, # backcompact
),
output_defs=[*output_defs, *extra_output_defs],
config_schema=config_schema,
required_resource_keys=required_resource_keys,
description=description,
tags={**user_tags, **default_tags},
)
[docs]def define_dagstermill_op(
name: str,
notebook_path: str,
ins: Optional[Mapping[str, In]] = None,
outs: Optional[Mapping[str, Out]] = None,
config_schema: Optional[Union[Any, Mapping[str, Any]]] = None,
required_resource_keys: Optional[Set[str]] = None,
output_notebook_name: Optional[str] = None,
asset_key_prefix: Optional[Union[Sequence[str], str]] = None,
description: Optional[str] = None,
tags: Optional[Mapping[str, Any]] = None,
io_manager_key: Optional[str] = None,
):
"""Wrap a Jupyter notebook in a op.
Arguments:
name (str): The name of the op.
notebook_path (str): Path to the backing notebook.
ins (Optional[Mapping[str, In]]): The op's inputs.
outs (Optional[Mapping[str, Out]]): The op's outputs. Your notebook should
call :py:func:`~dagstermill.yield_result` to yield each of these outputs.
required_resource_keys (Optional[Set[str]]): The string names of any required resources.
output_notebook_name: (Optional[str]): If set, will be used as the name of an injected output
of type of :py:class:`~dagster.BufferedIOBase` that is the file object of the executed
notebook (in addition to the :py:class:`~dagster.AssetMaterialization` that is always
created). It allows the downstream ops to access the executed notebook via a file
object.
asset_key_prefix (Optional[Union[List[str], str]]): If set, will be used to prefix the
asset keys for materialized notebooks.
description (Optional[str]): If set, description used for op.
tags (Optional[Dict[str, str]]): If set, additional tags used to annotate op.
Dagster uses the tag keys `notebook_path` and `kind`, which cannot be
overwritten by the user.
io_manager_key (Optional[str]): If using output_notebook_name, you can additionally provide
a string key for the IO manager used to store the output notebook.
If not provided, the default key output_notebook_io_manager will be used.
Returns:
:py:class:`~dagster.OpDefinition`
"""
check.str_param(name, "name")
check.str_param(notebook_path, "notebook_path")
required_resource_keys = set(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
)
outs = check.opt_mapping_param(outs, "outs", key_type=str, value_type=Out)
ins = check.opt_mapping_param(ins, "ins", key_type=str, value_type=In)
if output_notebook_name is not None:
io_mgr_key = check.opt_str_param(
io_manager_key, "io_manager_key", default="output_notebook_io_manager"
)
required_resource_keys.add(io_mgr_key)
outs = {
**outs,
cast(str, output_notebook_name): Out(io_manager_key=io_mgr_key),
}
if isinstance(asset_key_prefix, str):
asset_key_prefix = [asset_key_prefix]
asset_key_prefix = check.opt_list_param(asset_key_prefix, "asset_key_prefix", of_type=str)
default_description = f"This op is backed by the notebook at {notebook_path}"
description = check.opt_str_param(description, "description", default=default_description)
user_tags = validate_tags(tags)
if tags is not None:
check.invariant(
"notebook_path" not in tags,
"user-defined solid tags contains the `notebook_path` key, but the `notebook_path` key is reserved for use by Dagster",
)
check.invariant(
"kind" not in tags,
"user-defined solid tags contains the `kind` key, but the `kind` key is reserved for use by Dagster",
)
default_tags = {"notebook_path": notebook_path, "kind": "ipynb"}
return OpDefinition(
name=name,
compute_fn=_dm_compute(
"define_dagstermill_op",
name,
notebook_path,
output_notebook_name,
asset_key_prefix=asset_key_prefix,
),
ins=ins,
outs=outs,
config_schema=config_schema,
required_resource_keys=required_resource_keys,
description=description,
tags={**user_tags, **default_tags},
)