Source code for dagster_airflow.operators.airflow_operator_to_op

import logging
import os
from contextlib import contextmanager
from typing import Optional, Sequence

from airflow.models import Connection
from airflow.models.baseoperator import BaseOperator

from dagster import Any, In, Nothing, OpDefinition, Out
from dagster import _check as check
from dagster import op


@contextmanager
def replace_airflow_logger_handlers(handler):
    try:
        root_logger = logging.getLogger()
        root_logger.addHandler(handler)
        yield
    finally:
        root_logger.removeHandler(handler)


[docs]def airflow_operator_to_op( airflow_op: BaseOperator, connections: Optional[Sequence[Connection]] = None, capture_python_logs=True, return_output=False, ) -> OpDefinition: """Construct a Dagster op corresponding to a given Airflow operator. The resulting op will execute the ``execute()`` method on the Airflow operator. Dagster and any dependencies required by the Airflow Operator must be available in the Python environment within which your Dagster ops execute. To specify Airflow connections utilized by the operator, instantiate and pass Airflow connection objects in a list to the ``connections`` parameter of this function. .. code-block:: python http_task = SimpleHttpOperator(task_id="my_http_task", endpoint="foo") connections = [Connection(conn_id="http_default", host="https://mycoolwebsite.com")] dagster_op = airflow_operator_to_op(http_task, connections=connections) In order to specify extra parameters to the connection, call the ``set_extra()`` method on the instantiated Airflow connection: .. code-block:: python s3_conn = Connection(conn_id=f's3_conn', conn_type="s3") s3_conn_extra = { "aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_access_key", } s3_conn.set_extra(json.dumps(s3_conn_extra)) Args: airflow_op (BaseOperator): The Airflow operator to convert into a Dagster op connections (Optional[List[Connection]]): Airflow connections utilized by the operator. capture_python_logs (bool): If False, will not redirect Airflow logs to compute logs. (default: True) return_output (bool): If True, will return any output from the Airflow operator. (default: False) Returns: converted_op (OpDefinition): The generated Dagster op """ airflow_op = check.inst_param(airflow_op, "airflow_op", BaseOperator) connections = check.opt_sequence_param(connections, "connections", Connection) @op( name=airflow_op.task_id, ins={"start_after": In(Nothing)}, out=Out(Any) if return_output else Out(Nothing), ) def converted_op(context): conn_names = [] for connection in connections: conn_name = f"AIRFLOW_CONN_{connection.conn_id}".upper() os.environ[conn_name] = connection.get_uri() conn_names.append(conn_name) if capture_python_logs: # Airflow has local logging configuration that may set logging.Logger.propagate # to be false. We override the logger object and replace it with DagsterLogManager. airflow_op._log = context.log # pylint: disable=protected-access # Airflow operators and hooks use separate logger objects. We add a handler to # receive logs from hooks. with replace_airflow_logger_handlers( context.log._dagster_handler # pylint: disable=protected-access ): output = airflow_op.execute({}) else: output = airflow_op.execute({}) for conn_name in conn_names: os.environ.pop(conn_name) if return_output: return output return converted_op