Source code for dagstermill.io_managers
import os
from pathlib import Path
from typing import Any, Optional, Sequence
import dagster._check as check
from dagster._config import Field
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataEntry, MetadataValue
from dagster._core.execution.context.input import InputContext
from dagster._core.execution.context.output import OutputContext
from dagster._core.storage.io_manager import IOManager, io_manager
from dagster._utils import mkdir_p
class OutputNotebookIOManager(IOManager):
def __init__(self, asset_key_prefix: Optional[Sequence[str]] = None):
self.asset_key_prefix = asset_key_prefix if asset_key_prefix else []
def get_output_asset_key(self, context: OutputContext):
if context.has_asset_key:
return None
return AssetKey([*self.asset_key_prefix, f"{context.step_key}_output_notebook"])
def handle_output(self, context: OutputContext, obj: bytes):
raise NotImplementedError
def load_input(self, context: InputContext) -> Any:
raise NotImplementedError
class LocalOutputNotebookIOManager(OutputNotebookIOManager):
"""Built-in IO Manager for handling output notebook."""
def __init__(self, base_dir: str, asset_key_prefix: Optional[Sequence[str]] = None):
super(LocalOutputNotebookIOManager, self).__init__(asset_key_prefix=asset_key_prefix)
self.base_dir = base_dir
self.write_mode = "wb"
self.read_mode = "rb"
def _get_path(self, context: OutputContext) -> str:
"""Automatically construct filepath."""
if context.has_asset_key:
keys = context.get_asset_identifier()
else:
keys = context.get_run_scoped_output_identifier()
return str(Path(self.base_dir, *keys).with_suffix(".ipynb"))
def handle_output(self, context: OutputContext, obj: bytes):
"""obj: bytes"""
check.inst_param(context, "context", OutputContext)
# the output notebook itself is stored at output_file_path
output_notebook_path = self._get_path(context)
mkdir_p(os.path.dirname(output_notebook_path))
with open(output_notebook_path, self.write_mode) as dest_file_obj:
dest_file_obj.write(obj)
yield MetadataEntry("Executed notebook", value=MetadataValue.notebook(output_notebook_path))
def load_input(self, context) -> bytes:
check.inst_param(context, "context", InputContext)
# pass output notebook to downstream solids as File Object
with open(self._get_path(context.upstream_output), self.read_mode) as file_obj:
return file_obj.read()
[docs]@io_manager(
config_schema={
"asset_key_prefix": Field(str, is_required=False),
"base_dir": Field(str, is_required=False),
},
)
def local_output_notebook_io_manager(init_context):
"""Built-in IO Manager that handles output notebooks."""
return LocalOutputNotebookIOManager(
base_dir=init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
),
asset_key_prefix=init_context.resource_config.get("asset_key_prefix", []),
)