Source code for dagster_airflow.dagster_job_factory
from dagster_airflow.dagster_pipeline_factory import make_dagster_pipeline_from_airflow_dag
[docs]def make_dagster_job_from_airflow_dag(
dag, tags=None, use_airflow_template_context=False, unique_id=None, mock_xcom=False
):
"""Construct a Dagster job corresponding to a given Airflow DAG.
Tasks in the resulting job will execute the ``execute()`` method on the corresponding
Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module
containing your DAG definition must be available in the Python environment within which your
Dagster solids execute.
To set Airflow's ``execution_date`` for use with Airflow Operator's ``execute()`` methods,
either:
1. (Best for ad hoc runs) Execute job directly. This will set execution_date to the
time (in UTC) of the run.
2. Add ``{'airflow_execution_date': utc_date_string}`` to the job tags. This will override
behavior from (1).
.. code-block:: python
my_dagster_job = make_dagster_job_from_airflow_dag(
dag=dag,
tags={'airflow_execution_date': utc_execution_date_str}
)
my_dagster_job.execute_in_process()
3. (Recommended) Add ``{'airflow_execution_date': utc_date_string}`` to the run tags,
such as in the Dagit UI. This will override behavior from (1) and (2)
We apply normalized_name() to the dag id and task ids when generating job name and op
names to ensure that names conform to Dagster's naming conventions.
Args:
dag (DAG): The Airflow DAG to compile into a Dagster job
tags (Dict[str, Field]): Job tags. Optionally include
`tags={'airflow_execution_date': utc_date_string}` to specify execution_date used within
execution of Airflow Operators.
use_airflow_template_context (bool): If True, will call get_template_context() on the
Airflow TaskInstance model which requires and modifies the DagRun table.
(default: False)
unique_id (int): If not None, this id will be postpended to generated op names. Used by
framework authors to enforce unique op names within a repo.
mock_xcom (bool): If not None, dagster will mock out all calls made to xcom, features that
depend on xcom may not work as expected.
Returns:
JobDefinition: The generated Dagster job
"""
pipeline_def = make_dagster_pipeline_from_airflow_dag(
dag, tags, use_airflow_template_context, unique_id, mock_xcom
)
# pass in tags manually because pipeline_def.graph doesn't have it threaded
return pipeline_def.graph.to_job(tags={**pipeline_def.tags})