Dynamic Mapping & Collect

These APIs provide the means for a simple kind of dynamic orchestration — where the work to be orchestrated is determined not at job definition time but at runtime, dependent on data that’s observed as part of job execution.

class dagster.DynamicOut(dagster_type=<class 'dagster._core.definitions.utils.NoValueSentinel'>, description=None, is_required=True, io_manager_key=None, metadata=None, asset_key=None, asset_partitions=None, asset_partitions_def=None)[source]

Variant of Out for an output that will dynamically alter the graph at runtime.

When using in a composition function such as @graph, dynamic outputs must be used with either

  • map - clone downstream ops for each separate DynamicOut

  • collect - gather across all DynamicOut in to a list

Uses the same constructor as Out

@op(
    config_schema={
        "path": Field(str, default_value=file_relative_path(__file__, "sample"))
    },
    out=DynamicOut(str),
)
def files_in_directory(context):
    path = context.op_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(os.path.join(dirname, file), mapping_key=_clean(file))

@job
def process_directory():
    files = files_in_directory()

    # use map to invoke an op on each dynamic output
    file_results = files.map(process_file)

    # use collect to gather the results in to a list
    summarize_directory(file_results.collect())
class dagster.DynamicOutput(value, mapping_key, output_name='result', metadata_entries=None, metadata=None)[source]

Variant of Output used to support dynamic mapping & collect. Each DynamicOutput produced by an op represents one item in a set that can be processed individually with map or gathered with collect.

Each DynamicOutput must have a unique mapping_key to distinguish it with it’s set.

Parameters:
  • value (Any) – The value returned by the compute function.

  • mapping_key (str) – The key that uniquely identifies this dynamic value relative to its peers. This key will be used to identify the downstream ops when mapped, ie mapped_op[example_mapping_key]

  • output_name (Optional[str]) – Name of the corresponding DynamicOut defined on the op. (default: “result”)

  • metadata_entries (Optional[Union[MetadataEntry, PartitionMetadataEntry]]) – (Experimental) A set of metadata entries to attach to events related to this output.

  • metadata (Optional[Dict[str, Union[str, float, int, MetadataValue]]]) – Arbitrary metadata about the failure. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.