Source code for dagster_airbyte.ops

from dagster_airbyte.resources import DEFAULT_POLL_INTERVAL_SECONDS
from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.utils import _get_attempt, generate_materializations

from dagster import Array, Bool, Field, In, Noneable, Nothing, Out, Output, op


[docs]@op( required_resource_keys={"airbyte"}, ins={"start_after": In(Nothing)}, out=Out( AirbyteOutput, description="Parsed json dictionary representing the details of the Airbyte connector after " "the sync successfully completes. " "See the [Airbyte API Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview) " "to see detailed information on this response.", ), config_schema={ "connection_id": Field( str, is_required=True, description="The Airbyte Connection ID that this op will sync. You can retrieve this " 'value from the "Connections" tab of a given connector in the Airbyte UI.', ), "poll_interval": Field( float, default_value=DEFAULT_POLL_INTERVAL_SECONDS, description="The time (in seconds) that will be waited between successive polls.", ), "poll_timeout": Field( Noneable(float), default_value=None, description="The maximum time that will waited before this operation is timed out. By " "default, this will never time out.", ), "yield_materializations": Field( config=Bool, default_value=True, description=( "If True, materializations corresponding to the results of the Airbyte sync will " "be yielded when the op executes." ), ), "asset_key_prefix": Field( config=Array(str), default_value=["airbyte"], description=( "If provided and yield_materializations is True, these components will be used to " "prefix the generated asset keys." ), ), }, tags={"kind": "airbyte"}, ) def airbyte_sync_op(context): """ Executes a Airbyte job sync for a given ``connection_id``, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a given ``connection_id``. It requires the use of the :py:class:`~dagster_airbyte.airbyte_resource`, which allows it to communicate with the Airbyte API. Examples: .. code-block:: python from dagster import job from dagster_airbyte import airbyte_resource, airbyte_sync_op my_airbyte_resource = airbyte_resource.configured( { "host": {"env": "AIRBYTE_HOST"}, "port": {"env": "AIRBYTE_PORT"}, } ) sync_foobar = airbyte_sync_op.configured({"connection_id": "foobar"}, name="sync_foobar") @job(resource_defs={"airbyte": my_airbyte_resource}) def my_simple_airbyte_job(): sync_foobar() @job(resource_defs={"airbyte": my_airbyte_resource}) def my_composed_airbyte_job(): final_foobar_state = sync_foobar(start_after=some_op()) other_op(final_foobar_state) """ airbyte_output = context.resources.airbyte.sync_and_poll( connection_id=context.op_config["connection_id"], poll_interval=context.op_config["poll_interval"], poll_timeout=context.op_config["poll_timeout"], ) if context.op_config["yield_materializations"]: yield from generate_materializations( airbyte_output, asset_key_prefix=context.op_config["asset_key_prefix"] ) yield Output( airbyte_output, metadata={ **_get_attempt(airbyte_output.job_details.get("attempts", [{}])[-1]).get( "totalStats", {} ) }, )