Source code for dagster._core.definitions.materialize

import warnings
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Set, Union

import dagster._check as check
from dagster._utils import merge_dicts
from dagster._utils.backcompat import ExperimentalWarning

from ..errors import DagsterInvariantViolationError
from ..instance import DagsterInstance
from ..storage.io_manager import IOManagerDefinition
from ..storage.mem_io_manager import mem_io_manager
from .assets import AssetsDefinition
from .assets_job import build_assets_job
from .job_definition import default_job_io_manager_with_fs_io_manager_schema
from .source_asset import SourceAsset
from .utils import DEFAULT_IO_MANAGER_KEY

if TYPE_CHECKING:
    from ..execution.execute_in_process_result import ExecuteInProcessResult


[docs]def materialize( assets: Sequence[Union[AssetsDefinition, SourceAsset]], run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, partition_key: Optional[str] = None, raise_on_error: bool = True, ) -> "ExecuteInProcessResult": """ Executes a single-threaded, in-process run which materializes provided assets. By default, will materialize assets to the local filesystem. Args: assets (Sequence[Union[AssetsDefinition, SourceAsset]]): The assets to materialize. Can also provide :py:class:`SourceAsset` objects to fill dependencies for asset defs. resources (Optional[Mapping[str, object]]): The resources needed for execution. Can provide resource instances directly, or resource definitions. Note that if provided resources conflict with resources directly on assets, an error will be thrown. run_config (Optional[Any]): The run config to use for the run that materializes the assets. partition_key: (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config. Returns: ExecuteInProcessResult: The result of the execution. """ from ..execution.build_resources import wrap_resources_for_execution assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) assets_defs = [the_def for the_def in assets if isinstance(the_def, AssetsDefinition)] source_assets = [the_def for the_def in assets if isinstance(the_def, SourceAsset)] instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) resource_defs = wrap_resources_for_execution(resources) resource_defs = merge_dicts( {DEFAULT_IO_MANAGER_KEY: default_job_io_manager_with_fs_io_manager_schema}, resource_defs ) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=ExperimentalWarning, message=".*build_assets_job.*" ) return build_assets_job( "in_process_materialization_job", assets=assets_defs, source_assets=source_assets, resource_defs=resource_defs, ).execute_in_process( run_config=run_config, instance=instance, partition_key=partition_key, raise_on_error=raise_on_error, )
[docs]def materialize_to_memory( assets: Sequence[Union[AssetsDefinition, SourceAsset]], run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, partition_key: Optional[str] = None, raise_on_error: bool = True, ) -> "ExecuteInProcessResult": """ Executes a single-threaded, in-process run which materializes provided assets in memory. Will explicitly use :py:func:`mem_io_manager` for all required io manager keys. If any io managers are directly provided using the `resources` argument, a :py:class:`DagsterInvariantViolationError` will be thrown. Args: assets (Sequence[Union[AssetsDefinition, SourceAsset]]): The assets to materialize. Can also provide :py:class:`SourceAsset` objects to fill dependencies for asset defs. run_config (Optional[Any]): The run config to use for the run that materializes the assets. resources (Optional[Mapping[str, object]]): The resources needed for execution. Can provide resource instances directly, or resource definitions. If provided resources conflict with resources directly on assets, an error will be thrown. partition_key: (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config. Returns: ExecuteInProcessResult: The result of the execution. """ from dagster._core.execution.build_resources import wrap_resources_for_execution assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) resource_defs = wrap_resources_for_execution(resources) # Gather all resource defs for the purpose of checking io managers. all_resource_defs = dict(resource_defs) for asset in assets: all_resource_defs = merge_dicts(asset.resource_defs, all_resource_defs) io_manager_keys = _get_required_io_manager_keys(assets) for io_manager_key in io_manager_keys: if io_manager_key in all_resource_defs: raise DagsterInvariantViolationError( "Attempted to call `materialize_to_memory` with a resource " f"provided for io manager key '{io_manager_key}'. Do not " "provide resources for io manager keys when calling " "`materialize_to_memory`, as it will override io management " "behavior for all keys." ) resource_defs = merge_dicts({key: mem_io_manager for key in io_manager_keys}, resource_defs) assets_defs = [the_def for the_def in assets if isinstance(the_def, AssetsDefinition)] source_assets = [the_def for the_def in assets if isinstance(the_def, SourceAsset)] instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=ExperimentalWarning, message=".*build_assets_job.*" ) return build_assets_job( "in_process_materialization_job", assets=assets_defs, source_assets=source_assets, resource_defs=resource_defs, ).execute_in_process( run_config=run_config, instance=instance, partition_key=partition_key, raise_on_error=raise_on_error, )
def _get_required_io_manager_keys( assets: Sequence[Union[AssetsDefinition, SourceAsset]] ) -> Set[str]: io_manager_keys = set() for asset in assets: for requirement in asset.get_resource_requirements(): if requirement.expected_type == IOManagerDefinition: io_manager_keys.add(requirement.key) return io_manager_keys