A Dagster job is usually based on a graph of connected ops, but its graph can also contain other graphs. Nesting graphs is useful for organizing large or complicated graphs and for abstracting away complexity. Dagster supports arbitrary levels of nesting.
We use the term node to refer to both ops and graphs, because both ops and graphs can be used as nodes inside graphs.
As a baseline, here's a job that does not use nesting. It starts with an op that returns a number, then uses two ops to convert it from Celsius to Fahrenheight, then logs the result:
from dagster import job, op
@opdefreturn_fifty():return50.0@opdefadd_thirty_two(number):return number +32.0@opdefmultiply_by_one_point_eight(number):return number *1.8@opdeflog_number(context, number):
context.log.info(f"number: {number}")@jobdefall_together_unnested():
log_number(add_thirty_two(multiply_by_one_point_eight(return_fifty())))
When executed, the above example will do the exact same thing as the non-nested version, but the nesting allows better organization of code and simplifies the presentation of the main graph in Dagit.
As shown in the example above, sub-graphs can have inputs and outputs - celsius_to_fahrenheit accepts a number argument, and it has a return statement. Sub-graph inputs and outputs enable connecting the inputs and outputs of nodes inside the graph to the inputs and outputs of nodes outside the graph. In the all_together_nested example:
The number input of the celsius_to_fahrenheit graph is passed as an argument to the multiply_by_one_point_eight op. This means that, when an outer graph invokes celsius_to_fahrenheit and provides the output of another op or sub-graph for the number arg, the output of that op or sub-graph will be passed to multiply_by_one_point_eight, and multiply_by_one_point_eight will not execute until the upstream op that produces the output has completed.
The implementation of the celsius_to_fahrenheit graph returns the result of the add_thirty_two op. This means that, when an outer graph invokes celsius_to_fahrenheit and passes its output to the input of another node, the output of add_thirty_two will be provided to that node, and any ops that ultimately receive that input will not execute until add_thirty_two has completed.
If you want to add a description to an input (that will display in Dagit), you can provide a GraphIn when constructing the graph.
Sub-graphs can dictate config for the ops and sub-graphs inside them. If the full config is known at the time that you're defining the graph, you can pass a dictionary to the config argument of the @graph decorator.
from dagster import graph, op
@op(config_schema={"n":float})defadd_n(context, number):return number + context.op_config["n"]@op(config_schema={"m":float})defmultiply_by_m(context, number):return number * context.op_config["m"]@graph(config={"multiply_by_m":{"config":{"m":1.8}},"add_n":{"config":{"n":32}}})defcelsius_to_fahrenheit(number):return multiply_by_m(add_n(number))
Alternatively, you can use "config mapping", i.e. you can provide a function that accepts config that's provided to the graph and generates config for the nodes inside the graph.
from dagster import config_mapping, graph, op
@op(config_schema={"n":float})defadd_n(context, number):return number + context.op_config["n"]@op(config_schema={"m":float})defmultiply_by_m(context, number):return number * context.op_config["m"]@config_mapping(config_schema={"from_unit":str})defgenerate_config(val):if val["from_unit"]=="celsius":
n =32elif val["from_unit"]=="kelvin":
n =-459.67else:raise ValueError()return{"multiply_by_m":{"config":{"m":1.8}},"add_n":{"config":{"n": n}}}@graph(config=generate_config)defto_fahrenheit(number):return multiply_by_m(add_n(number))
To run a job that contains to_fahrenheit as a sub-graph, you need to provide a value for the from_unit config option:
To have multiple outputs from a graph, you need to define the outputs it maps and return a dictionary, where the keys are the output names and the values are the output values.
from dagster import GraphOut
@opdefecho(i):print(i)@opdefone()->int:return1@opdefhello()->str:return"hello"@graph(out={"x": GraphOut(),"y": GraphOut()})defgraph_with_multiple_outputs():
x = one()
y = hello()return{"x": x,"y": y}@jobdefsubgraph_multiple_outputs_job():
x, y = graph_with_multiple_outputs()
echo(x)
echo(y)