Source code for dagster_dbt.ops

from dagster import Array, Bool, Field, In, Nothing, Out, Output, op

from .types import DbtOutput
from .utils import generate_events, generate_materializations

_DEFAULT_OP_PROPS = dict(
    required_resource_keys={"dbt"},
    ins={"start_after": In(Nothing)},
    out=Out(DbtOutput, description="Parsed output from running the dbt command."),
    tags={"kind": "dbt"},
)


def _get_doc(op_name: str, dbt_command: str) -> str:
    return f"""
This op executes a ``dbt {dbt_command}`` command. It requires the use of a dbt resource, which can be
set to execute this command through the CLI (using the :py:class:`~dagster_dbt.dbt_cli_resource`) or
over RPC (using the :py:class:`~dbt_rpc_sync_resource`).

Examples:

.. code-block:: python

    from dagster import job
    from dagster_dbt import {op_name}, dbt_cli_resource, dbt_rpc_sync_resource

    @job(resource_defs={{"dbt":dbt_cli_resource}})
    def my_dbt_cli_job():
        {op_name}()

    @job(resource_defs={{"dbt":dbt_rpc_sync_resource}})
    def my_dbt_rpc_job():
        {op_name}()
    """


# NOTE: mypy fails to properly track the type of `_DEFAULT_OP_PROPS` items when they are
# double-splatted, so we type-ignore the below op declarations.


@op(  # type: ignore
    **_DEFAULT_OP_PROPS,
    config_schema={
        "yield_asset_events": Field(
            config=Bool,
            default_value=True,
            description=(
                "If True, materializations and asset observations corresponding to the results of "
                "the dbt operation will be yielded when the op executes. Default: True"
            ),
        ),
        "asset_key_prefix": Field(
            config=Array(str),
            default_value=["dbt"],
            description=(
                "If provided and yield_materializations is True, these components will be used to "
                "prefix the generated asset keys."
            ),
        ),
    },
)
def dbt_build_op(context):
    dbt_output = context.resources.dbt.build()
    if context.op_config["yield_asset_events"] and "results" in dbt_output.result:
        yield from generate_events(
            dbt_output,
            node_info_to_asset_key=lambda info: context.op_config["asset_key_prefix"]
            + info["unique_id"].split("."),
            manifest_json=context.resources.dbt.get_manifest_json(),
        )
    yield Output(dbt_output)


[docs]@op( # type: ignore **_DEFAULT_OP_PROPS, config_schema={ "yield_materializations": Field( config=Bool, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the op executes. Default: True" ), ), "asset_key_prefix": Field( config=Array(str), default_value=["dbt"], description=( "If provided and yield_materializations is True, these components will be used to " "prefix the generated asset keys." ), ), }, ) def dbt_run_op(context): dbt_output = context.resources.dbt.run() if context.op_config["yield_materializations"] and "results" in dbt_output.result: yield from generate_materializations( dbt_output, asset_key_prefix=context.op_config["asset_key_prefix"] ) yield Output(dbt_output)
[docs]@op(**_DEFAULT_OP_PROPS) # type: ignore def dbt_compile_op(context): return context.resources.dbt.compile()
[docs]@op(**_DEFAULT_OP_PROPS) # type: ignore def dbt_ls_op(context): return context.resources.dbt.ls()
[docs]@op(**_DEFAULT_OP_PROPS) # type: ignore def dbt_test_op(context): return context.resources.dbt.test()
[docs]@op(**_DEFAULT_OP_PROPS) # type: ignore def dbt_snapshot_op(context): return context.resources.dbt.snapshot()
[docs]@op(**_DEFAULT_OP_PROPS) # type: ignore def dbt_seed_op(context): return context.resources.dbt.seed()
[docs]@op(**_DEFAULT_OP_PROPS) # type: ignore def dbt_docs_generate_op(context): return context.resources.dbt.generate_docs()
for dbt_op, cmd in [ (dbt_build_op, "build"), (dbt_run_op, "run"), (dbt_compile_op, "compile"), (dbt_ls_op, "ls"), (dbt_test_op, "test"), (dbt_snapshot_op, "snapshot"), (dbt_seed_op, "seed"), (dbt_docs_generate_op, "docs generate"), ]: dbt_op.__doc__ = _get_doc(dbt_op.name, cmd)