Software-Defined Assets

An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. A software-defined asset is a Dagster object that couples an asset to the function and upstream assets that are used to produce its contents.

@dagster.asset(compute_fn=None, *, name=None, key_prefix=None, ins=None, non_argument_deps=None, metadata=None, description=None, config_schema=None, required_resource_keys=None, resource_defs=None, io_manager_def=None, io_manager_key=None, compute_kind=None, dagster_type=None, partitions_def=None, op_tags=None, group_name=None, output_required=True, freshness_policy=None, retry_policy=None, op_version=None)[source]

Create a definition for how to compute an asset.

A software-defined asset is the combination of:
  1. An asset key, e.g. the name of a table.

  2. A function, which can be run to compute the contents of the asset.

  3. A set of upstream assets that are provided as inputs to the function when computing the asset.

Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows about the upstream assets it depends on. The upstream assets are inferred from the arguments to the decorated function. The name of the argument designates the name of the upstream asset.

Parameters:
  • name (Optional[str]) – The name of the asset. If not provided, defaults to the name of the decorated function. The asset’s name must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, the asset’s key is the concatenation of the key_prefix and the asset’s name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – Set of asset keys that are upstream dependencies, but do not pass an input to the asset.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the asset’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata entries for the asset.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”). Only one of io_manager_key and io_manager_def can be provided.

  • io_manager_def (Optional[IOManagerDefinition]) – (Experimental) The definition of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops. Only one of io_manager_def and io_manager_key can be provided.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in Dagit as a badge on the asset.

  • dagster_type (Optional[DagsterType]) – Allows specifying type validation functions that will be executed on the output of the decorated function after it runs.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – (Experimental) A mapping of resource keys to resource definitions. These resources will be initialized during execution, and can be accessed from the context within the body of the function.

  • output_required (bool) – Whether the decorated function will always materialize an asset. Defaults to True. If False, the function can return None, which will not be materialized to storage and will halt execution of downstream assets.

  • freshness_policy (FreshnessPolicy) – A constraint telling Dagster how often this asset is intended to be updated with respect to its root data.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that computes the asset.

  • op_version (Optional[str]) – (Experimental) Version string passed to the op underlying the asset.

Examples

@asset
def my_asset(my_upstream_asset: int) -> int:
    return my_upstream_asset + 1
class dagster.AssetIn(key=None, metadata=None, key_prefix=None, asset_key=None, input_manager_key=None, partition_mapping=None, dagster_type=<class 'dagster._core.definitions.utils.NoValueSentinel'>)[source]

Defines an asset dependency.

key_prefix

If provided, the asset’s key is the concatenation of the key_prefix and the input name. Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str]]]

key

The asset’s key. Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str], AssetKey]]

metadata

A dict of the metadata for the input. For example, if you only need a subset of columns from an upstream table, you could include that in metadata and the IO manager that loads the upstream table could use the metadata to determine which columns to load.

Type:

Optional[Dict[str, Any]]

partition_mapping

Defines what partitions to depend on in the upstream asset. If not provided, defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

Type:

Optional[PartitionMapping]

dagster_type

Allows specifying type validation functions that will be executed on the input of the decorated function before it runs.

Type:

DagsterType

class dagster.SourceAsset(key, metadata=None, io_manager_key=None, io_manager_def=None, description=None, partitions_def=None, _metadata_entries=None, group_name=None, resource_defs=None, observe_fn=None)[source]

A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster.

key

The key of the asset.

Type:

Union[AssetKey, Sequence[str], str]

metadata_entries

Metadata associated with the asset.

Type:

List[MetadataEntry]

io_manager_key

The key for the IOManager that will be used to load the contents of the asset when it’s used as an input to other assets inside a job.

Type:

Optional[str]

io_manager_def

(Experimental) The definition of the IOManager that will be used to load the contents of the asset when it’s used as an input to other assets inside a job.

Type:

Optional[IOManagerDefinition]

resource_defs

(Experimental) resource definitions that may be required by the dagster.IOManagerDefinition provided in the io_manager_def argument.

Type:

Optional[Mapping[str, ResourceDefinition]]

description

The description of the asset.

Type:

Optional[str]

partitions_def

Defines the set of partition keys that compose the asset.

Type:

Optional[PartitionsDefinition]

observe_fn
Type:

Optional[SourceAssetObserveFunction]

dagster.define_asset_job(name, selection=None, config=None, description=None, tags=None, partitions_def=None, executor_def=None)[source]

Creates a definition of a job which will materialize a selection of assets. This will only be resolved to a JobDefinition once placed in a repository.

Parameters:
  • name (str) – The name for the job.

  • selection (Union[str, Sequence[str], AssetSelection]) –

    A selection over the set of Assets available on your repository. This can be a string such as “my_asset*”, a list of such strings (representing a union of these selections), or an AssetSelection object.

    This selection will be resolved to a set of Assets once the repository is loaded with a set of AssetsDefinitions.

  • config

    Describes how the Job is parameterized at runtime.

    If no value is provided, then the schema for the job’s run config is a standard format based on its solids and resources.

    If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.

    If a ConfigMapping object is provided, then the schema for the job’s run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job.

  • tags (Optional[Mapping[str, Any]]) – Arbitrary information that will be attached to the execution of the Job. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • description (Optional[str]) – A description for the Job.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partitions for this job. All AssetDefinitions selected for this job must have a matching PartitionsDefinition.

  • executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multi_or_in_process_executor, which can be switched between multi-process and in-process modes of execution. The default mode of execution is multi-process.

Returns:

The job, which can be placed inside a repository.

Return type:

UnresolvedAssetJobDefinition

Examples

# A job that targets all assets in the repository:
@asset
def asset1():
    ...

@repository
def repo():
    return [asset1, define_asset_job("all_assets")]

# A job that targets all the assets in a group:
@repository
def repo():
    return [
        assets,
        define_asset_job("marketing_job", selection=AssetSelection.groups("marketing")),
    ]

# Resources are supplied to the assets, not the job:
@asset(required_resource_keys={"slack_client"})
def asset1():
    ...

@repository
def prod_repo():
    return [
        *with_resources([asset1], resource_defs={"slack_client": prod_slack_client}),
        define_asset_job("all_assets"),
    ]
class dagster.AssetSelection[source]

An AssetSelection defines a query over a set of assets, normally all the assets in a repository.

You can use the “|”, “&”, and “-” operators to create unions, intersections, and differences of asset selections, respectively.

AssetSelections are typically used with define_asset_job().

Examples

# Select all assets in group "marketing":
AssetSelection.groups("marketing")

# Select all assets in group "marketing", as well as the asset with key "promotion":
AssetSelection.groups("marketing") | AssetSelection.keys("promotion")

# Select all assets in group "marketing" that are downstream of asset "leads":
AssetSelection.groups("marketing") & AssetSelection.keys("leads").downstream()

# Select all assets except for those in group "marketing"
AssetSelection.all() - AssetSelection.groups("marketing")
static all()[source]

Returns a selection that includes all assets.

static assets(*assets_defs)[source]

Returns a selection that includes all of the provided assets.

downstream(depth=None, include_self=True)[source]

Returns a selection that includes all assets that are downstream of any of the assets in this selection, selecting the assets in this selection by default. Iterates through each asset in this selection and returns the union of all downstream assets.

depth (Optional[int]): If provided, then only include assets to the given depth. A depth

of 2 means all assets that are children or grandchildren of the assets in this selection.

include_self (bool): If True, then include the assets in this selection in the result.

If the include_self flag is False, return each downstream asset that is not part of the original selection. By default, set to True.

static groups(*group_strs)[source]

Returns a selection that includes assets that belong to any of the provided groups

static keys(*asset_keys)[source]

Returns a selection that includes assets with any of the provided keys.

sinks()[source]

Given an asset selection, returns a new asset selection that contains all of the sink assets within the original asset selection.

A sink asset is an asset that has no downstream dependencies within the asset selection. The sink asset can have downstream dependencies outside of the asset selection.

sources()[source]

Given an asset selection, returns a new asset selection that contains all of the source assets within the original asset selection.

A source asset is an asset that has no upstream dependencies within the asset selection. The source asset can have downstream dependencies outside of the asset selection.

upstream(depth=None, include_self=True)[source]

Returns a selection that includes all assets that are upstream of any of the assets in this selection, selecting the assets in this selection by default. Iterates through each asset in this selection and returns the union of all downstream assets.

Parameters:
  • depth (Optional[int]) – If provided, then only include assets to the given depth. A depth of 2 means all assets that are parents or grandparents of the assets in this selection.

  • include_self (bool) – If True, then include the assets in this selection in the result. If the include_self flag is False, return each upstream asset that is not part of the original selection. By default, set to True.

dagster.load_assets_from_modules(modules, group_name=None, key_prefix=None)[source]

Constructs a list of assets and source assets from the given modules.

Parameters:
  • modules (Iterable[ModuleType]) – The Python modules to look for assets inside.

  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, List[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets and source assets defined in the given modules.

Return type:

List[Union[AssetsDefinition, SourceAsset]]

dagster.load_assets_from_current_module(group_name=None, key_prefix=None)[source]

Constructs a list of assets and source assets from the module where this function is called.

Parameters:
  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, List[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets and source assets defined in the module.

Return type:

List[Union[AssetsDefinition, SourceAsset]]

dagster.load_assets_from_package_module(package_module, group_name=None, key_prefix=None)[source]

Constructs a list of assets and source assets that includes all asset definitions and source assets in all sub-modules of the given package module.

A package module is the result of importing a package.

Parameters:
  • package_module (ModuleType) – The package module to looks for assets inside.

  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, List[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets and source assets defined in the module.

Return type:

List[Union[AssetsDefinition, SourceAsset]]

dagster.load_assets_from_package_name(package_name, group_name=None, key_prefix=None)[source]

Constructs a list of assets and source assets that include all asset definitions and source assets in all sub-modules of the given package.

Parameters:
  • package_name (str) – The name of a Python package to look for assets inside.

  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, List[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets and source assets defined in the module.

Return type:

List[Union[AssetsDefinition, SourceAsset]]

class dagster.AssetsDefinition(*, keys_by_input_name, keys_by_output_name, node_def, partitions_def=None, partition_mappings=None, asset_deps=None, selected_asset_keys=None, can_subset=False, resource_defs=None, group_names_by_key=None, metadata_by_key=None, freshness_policies_by_key=None)[source]

Defines a set of assets that are produced by the same op or graph.

AssetsDefinitions are typically not instantiated directly, but rather produced using the @asset or @multi_asset decorators.

asset_deps

Maps assets that are produced by this definition to assets that they depend on. The dependencies can be either “internal”, meaning that they refer to other assets that are produced by this definition, or “external”, meaning that they refer to assets that aren’t produced by this definition.

Type:

Mapping[AssetKey, AbstractSet[AssetKey]]

static from_graph(graph_def, *, keys_by_input_name=None, keys_by_output_name=None, key_prefix=None, internal_asset_deps=None, partitions_def=None, group_name=None, resource_defs=None, partition_mappings=None, metadata_by_output_name=None, can_subset=False)[source]

Constructs an AssetsDefinition from a GraphDefinition.

Parameters:
  • graph_def (GraphDefinition) – The GraphDefinition that is an asset.

  • keys_by_input_name (Optional[Mapping[str, AssetKey]]) – A mapping of the input names of the decorated graph to their corresponding asset keys. If not provided, the input asset keys will be created from the graph input names.

  • keys_by_output_name (Optional[Mapping[str, AssetKey]]) – A mapping of the output names of the decorated graph to their corresponding asset keys. If not provided, the output asset keys will be created from the graph output names.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, key_prefix will be prepended to each key in keys_by_output_name. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by the graph depend on all assets that are consumed by that graph. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the graph.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • group_name (Optional[str]) – A group name for the constructed asset. Assets without a group name are assigned to a group called “default”.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – (Experimental) A mapping of resource keys to resource definitions. These resources will be initialized during execution, and can be accessed from the body of ops in the graph during execution.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

  • metadata_by_output_name (Optional[Mapping[str, MetadataUserInput]]) – Defines metadata to be associated with each of the output assets for this node. Keys are names of the outputs, and values are dictionaries of metadata to be associated with the related asset.

static from_op(op_def, *, keys_by_input_name=None, keys_by_output_name=None, key_prefix=None, internal_asset_deps=None, partitions_def=None, group_name=None, partition_mappings=None, metadata_by_output_name=None)[source]

Constructs an AssetsDefinition from an OpDefinition.

Parameters:
  • op_def (OpDefinition) – The OpDefinition that is an asset.

  • keys_by_input_name (Optional[Mapping[str, AssetKey]]) – A mapping of the input names of the decorated op to their corresponding asset keys. If not provided, the input asset keys will be created from the op input names.

  • keys_by_output_name (Optional[Mapping[str, AssetKey]]) – A mapping of the output names of the decorated op to their corresponding asset keys. If not provided, the output asset keys will be created from the op output names.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, key_prefix will be prepended to each key in keys_by_output_name. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by the op depend on all assets that are consumed by that op. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • group_name (Optional[str]) – A group name for the constructed asset. Assets without a group name are assigned to a group called “default”.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

  • metadata_by_output_name (Optional[Mapping[str, MetadataUserInput]]) – Defines metadata to be associated with each of the output assets for this node. Keys are names of the outputs, and values are dictionaries of metadata to be associated with the related asset.

@dagster.multi_asset(*, outs, name=None, ins=None, non_argument_deps=None, description=None, config_schema=None, required_resource_keys=None, compute_kind=None, internal_asset_deps=None, partitions_def=None, op_tags=None, can_subset=False, resource_defs=None, group_name=None, retry_policy=None)[source]

Create a combined definition of multiple assets that are computed using the same op and same upstream assets.

Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset.

Parameters:
  • name (Optional[str]) – The name of the op.

  • outs – (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – Set of asset keys that are upstream dependencies, but do not pass an input to the multi_asset.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the asset’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the underlying op.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in Dagit as a badge on the asset.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by a multi_asset depend on all assets that are consumed by that multi asset. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • can_subset (bool) – If this asset’s computation can emit a subset of the asset keys based on the context.selected_assets argument. Defaults to False.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – (Experimental) A mapping of resource keys to resource definitions. These resources will be initialized during execution, and can be accessed from the context within the body of the function.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that computes the asset.

class dagster.AssetOut(key_prefix=None, key=None, dagster_type=<class 'dagster._core.definitions.utils.NoValueSentinel'>, description=None, is_required=True, io_manager_key=None, metadata=None, group_name=None)[source]

Defines one of the assets produced by a @multi_asset.

key_prefix

If provided, the asset’s key is the concatenation of the key_prefix and the asset’s name. When using @multi_asset, the asset name defaults to the key of the “outs” dictionary Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str]]]

key

The asset’s key. Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str], AssetKey]]

dagster_type

The type of this output. Should only be set if the correct type can not be inferred directly from the type signature of the decorated function.

Type:

Optional[Union[Type, DagsterType]]]

description

Human-readable description of the output.

Type:

Optional[str]

is_required

Whether the presence of this field is required. (default: True)

Type:

bool

io_manager_key

The resource key of the IO manager used for this output. (default: “io_manager”).

Type:

Optional[str]

metadata

A dict of the metadata for the output. For example, users can provide a file path if the data object will be stored in a filesystem, or provide information of a database table when it is going to load the data into the table.

Type:

Optional[Dict[str, Any]]

group_name

A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

Type:

Optional[str]

class dagster.AssetValueLoader(assets_defs_by_key, instance=None)[source]

Caches resource definitions that are used to load asset values across multiple load invocations.

Should not be instantiated directly. Instead, use get_asset_value_loader().

load_asset_value(asset_key, *, python_type=None, partition_key=None)[source]

Loads the contents of an asset as a Python object.

Invokes load_input on the IOManager associated with the asset.

Parameters:
  • asset_key (Union[AssetKey, Sequence[str], str]) – The key of the asset to load.

  • python_type (Optional[Type]) – The python type to load the asset as. This is what will be returned inside load_input by context.dagster_type.typing_type.

  • partition_key (Optional[str]) – The partition of the asset to load.

Returns:

The contents of an asset as a Python object.