Running computations on Spark presents unique challenges, because, unlike other computations, Spark jobs typically execute on infrastructure that's specialized for Spark - i.e. that can network sets of workers into clusters that Spark can run computations against. Spark applications are typically not containerized or executed on Kubernetes. Running Spark code often requires submitting code to a Databricks or EMR cluster.
There are two approaches to writing Dagster ops that invoke Spark computations:
With this approach, the code inside the op submits a Spark job to an external system like Databricks or EMR, usually pointing to a jar or zip of Python files that contain the actual Spark data transformations and actions.
If you want to use this approach to run a Spark job on Databricks, create_databricks_job_op helps create an op that submits a Spark job using the Databricks REST API.
If you want to run a Spark job against YARN or a Spark Standalone cluster, you can use create_shell_command_op to create an op that invokes spark-submit.
This is the easiest approach for migrating existing Spark jobs, and it's the only approach that works for Spark jobs written in Java or Scala. The downside is that it loses out on some of the benefits of Dagster - the implementation of each op is bound to the execution environment, so you can't run your Spark transformations without relying on external infrastructure. Writing unit tests is cumbersome.
With this approach, the code inside the op consists of pure logical data transformations on Spark DataFrames or RDDs. The op-decorated function accepts DataFrames as parameters and returns DataFrames when it completes. An IO manager handles writing and reading the DataFrames to and from persistent storage. The Running PySpark code in op example below shows what this looks like.
If you want your Spark driver to run inside a Spark cluster, you use a "step launcher" resource that informs Dagster how to launch the op. The step launcher resource is responsible for invoking spark-submit or submitting the job to Databricks or EMR. Submitting PySpark ops on EMR shows what this looks like for EMR.
The advantage of this approach is a very clean local testing story. You can run an entire job of Spark ops in a single process. You can use IO managers to abstract away IO - storing outputs on the local filesystem during local development and in the cloud in production.
The downside is that this approach only works with PySpark, and setting up a step launcher can be difficult. We currently provide an emr_pyspark_step_launcher and a databricks_pyspark_step_launcher, but if you need to submit your Spark job to a different kind of cluster, writing your own can be time consuming (here are some tips). You also need to install the Dagster library itself on the cluster.
Passing PySpark DataFrames between ops requires a little bit of extra care, compared to other data types, for a couple reasons:
Spark has a lazy execution model, which means that PySpark won't process any data until an action like write or collect is called on a DataFrame.
PySpark DataFrames cannot be pickled, which means that IO Managers like the fs_io_manager won't work for them.
In this example, we've defined an IOManager that knows how to store and retrieve PySpark DataFrames that are produced and consumed by ops.
This example assumes that all the outputs within the dagster job will be PySpark DataFrames and stored in the same way. To learn how to use different IO managers for different outputs within the same dagster job, take a look at the IO Manager concept page.
This example writes out DataFrames to the local file system, but can be tweaked to write to cloud object stores like S3 by changing to the write and read invocations.
This example demonstrates how to use the emr_pyspark_step_launcher to have an op run as a Spark step on an EMR cluster. In it, each of the two ops will be executed as a separate EMR step on the same EMR cluster.