Source code for dagster_spark.ops

from dagster import In, Nothing, Out
from dagster import _check as check
from dagster import op

from .configs import define_spark_config


[docs]def create_spark_op( name, main_class, description=None, required_resource_keys=frozenset(["spark"]) ): check.str_param(name, "name") check.str_param(main_class, "main_class") check.opt_str_param(description, "description", "A parameterized Spark job.") check.set_param(required_resource_keys, "required_resource_keys") @op( name=name, description=description, config_schema=define_spark_config(), ins={"start": In(Nothing)}, out=Out(Nothing), tags={"kind": "spark", "main_class": main_class}, required_resource_keys=required_resource_keys, ) def spark_op(context): # pylint: disable=unused-argument context.resources.spark.run_spark_job(context.op_config, main_class) return spark_op