Source code for dagster_pyspark.resources
from dagster_spark.configs_spark import spark_config
from dagster_spark.utils import flatten_dict
from pyspark.sql import SparkSession
import dagster._check as check
from dagster import resource
def spark_session_from_config(spark_conf=None):
spark_conf = check.opt_dict_param(spark_conf, "spark_conf")
builder = SparkSession.builder
flat = flatten_dict(spark_conf)
for key, value in flat:
builder = builder.config(key, value)
return builder.getOrCreate()
class PySparkResource:
def __init__(self, spark_conf):
self._spark_session = spark_session_from_config(spark_conf)
@property
def spark_session(self):
return self._spark_session
@property
def spark_context(self):
return self.spark_session.sparkContext
[docs]@resource({"spark_conf": spark_config()})
def pyspark_resource(init_context):
"""This resource provides access to a PySpark SparkSession for executing PySpark code within Dagster.
Example:
.. code-block:: python
@op(required_resource_keys={"pyspark"})
def my_op(context):
spark_session = context.resources.pyspark.spark_session
dataframe = spark_session.read.json("examples/src/main/resources/people.json")
my_pyspark_resource = pyspark_resource.configured(
{"spark_conf": {"spark.executor.memory": "2g"}}
)
@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
my_op()
"""
return PySparkResource(init_context.resource_config["spark_conf"])
class LazyPySparkResource:
def __init__(self, spark_conf):
self._spark_session = None
self._spark_conf = spark_conf
def _init_session(self):
if self._spark_session is None:
self._spark_session = spark_session_from_config(self._spark_conf)
@property
def spark_session(self):
self._init_session()
return self._spark_session
@property
def spark_context(self):
self._init_session()
return self._spark_session.sparkContext
@resource({"spark_conf": spark_config()})
def lazy_pyspark_resource(init_context):
"""This resource provides access to a lazily-created PySpark SparkSession for executing PySpark
code within Dagster, avoiding the creation of a SparkSession object until the .spark_session attribute
of the resource is accessed. This is helpful for avoiding the creation (and startup penalty) of a SparkSession
until it is actually needed / accessed by an op or IOManager.
Example:
.. code-block:: python
@op(required_resource_keys={"lazy_pyspark"})
def my_op(context):
spark_session = context.resources.lazy_pyspark.spark_session
dataframe = spark_session.read.json("examples/src/main/resources/people.json")
my_pyspark_resource = lazy_pyspark_resource.configured(
{"spark_conf": {"spark.executor.memory": "2g"}}
)
@job(resource_defs={"lazy_pyspark": my_pyspark_resource})
def my_spark_job():
my_op()
"""
return LazyPySparkResource(init_context.resource_config["spark_conf"])