Source code for dagster_databricks.solids

from dagster import Field, In, Nothing, Out, Permissive
from dagster import _check as check
from dagster import op

from .databricks import wait_for_run_to_complete

_START = "start"

_DEFAULT_POLL_INTERVAL = 10
# wait at most 24 hours by default for run execution
_DEFAULT_RUN_MAX_WAIT_TIME_SEC = 24 * 60 * 60


[docs]def create_databricks_job_op( name="databricks_job", num_inputs=1, description=None, required_resource_keys=frozenset(["databricks_client"]), ): """ Creates an op that launches a databricks job (not to be confused with Dagster's job API). As config, the op accepts a blob of the form described in Databricks' job API: https://docs.databricks.com/dev-tools/api/latest/jobs.html. Returns: OpDefinition: An op definition. Example: .. code-block:: python from dagster import job from dagster_databricks import create_databricks_job_op, databricks_client sparkpi = create_databricks_job_op().configured( { "job": { "run_name": "SparkPi Python job", "new_cluster": { "spark_version": "7.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 2, }, "spark_python_task": {"python_file": "dbfs:/docs/pi.py", "parameters": ["10"]}, } }, name="sparkpi", ) @job( resource_defs={ "databricks_client": databricks_client.configured( {"host": "my.workspace.url", "token": "my.access.token"} ) } ) def do_stuff(): sparkpi() """ check.str_param(name, "name") check.opt_str_param(description, "description") check.int_param(num_inputs, "num_inputs") check.set_param(required_resource_keys, "required_resource_keys", of_type=str) ins = {"input_" + str(i): In(Nothing) for i in range(num_inputs)} @op( name=name, description=description, config_schema={ "job": Field( Permissive(), description="Databricks job run configuration, in the form described in " "Databricks' job API: https://docs.databricks.com/dev-tools/api/latest/jobs.html", ), "poll_interval_sec": Field( float, description="Check whether the job is done at this interval.", default_value=_DEFAULT_POLL_INTERVAL, ), "max_wait_time_sec": Field( float, description="If the job is not complete after this length of time, raise an error.", default_value=_DEFAULT_RUN_MAX_WAIT_TIME_SEC, ), }, ins=ins, out=Out(Nothing), required_resource_keys=required_resource_keys, tags={"kind": "databricks"}, ) def databricks_fn(context): job_config = context.op_config["job"] databricks_client = context.resources.databricks_client run_id = databricks_client.submit_run(**job_config) context.log.info( "Launched databricks job with run id {run_id}. UI: {url}. Waiting to run to completion...".format( run_id=run_id, url=create_ui_url(databricks_client, context.op_config) ) ) wait_for_run_to_complete( databricks_client, context.log, run_id, context.op_config["poll_interval_sec"], context.op_config["max_wait_time_sec"], ) return databricks_fn
def create_ui_url(databricks_client, op_config): host = databricks_client.host workspace_id = databricks_client.workspace_id or "<workspace_id>" if "existing_cluster_id" in op_config["job"]: return "https://{host}/?o={workspace_id}#/setting/clusters/{cluster_id}/sparkUi".format( host=host, workspace_id=workspace_id, cluster_id=op_config["job"]["existing_cluster_id"], ) else: return "https://{host}/?o={workspace_id}#joblist".format( host=host, workspace_id=workspace_id )