Orchestration on Celery + Kubernetes

APIs

dagster_celery_k8s.CeleryK8sRunLauncher RunLauncher[source]

Config Schema:
job_image (Union[dagster.StringSource, None], optional):

Docker image to use for launched Jobs. If this field is empty, the image that was used to originally load the Dagster repository will be used. (Ex: “mycompany.com/dagster-k8s-image:latest”).

image_pull_policy (Union[dagster.StringSource, None], optional):

Image pull policy to set on launched Pods.

image_pull_secrets (Union[List[strict dict], None], optional):

Specifies that Kubernetes should get the credentials from the Secrets named in this list.

service_account_name (Union[dagster.StringSource, None], optional):

The name of the Kubernetes service account under which to run.

env_config_maps (Union[List[dagster.StringSource], None], optional):

A list of custom ConfigMapEnvSource names from which to draw environment variables (using envFrom) for the Job. Default: []. See:https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container

env_secrets (Union[List[dagster.StringSource], None], optional):

A list of custom Secret names from which to draw environment variables (using envFrom) for the Job. Default: []. See:https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables

env_vars (Union[List[String], None], optional):

A list of environment variables to inject into the Job. Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the current process). Default: []. See: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables

volume_mounts (List[permissive dict], optional):

A list of volume mounts to include in the job’s container. Default: []. See: https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volumemount-v1-core

Default Value: []

volumes (List[permissive dict], optional):

A list of volumes to include in the Job’s Pod. Default: []. For the many possible volume source types that can be included, see: https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volume-v1-core

Default Value: []

labels (permissive dict, optional):

Labels to apply to all created pods. See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels

resources (Union[strict dict, None], optional):

Compute resource requirements for the container. See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/

scheduler_name (Union[dagster.StringSource, None], optional):

Use a custom Kubernetes scheduler for launched Pods. See:https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/

instance_config_map (dagster.StringSource):

The name of an existing Volume to mount into the pod in order to provide a ConfigMap for the Dagster instance. This Volume should contain a dagster.yaml with appropriate values for run storage, event log storage, etc.

postgres_password_secret (dagster.StringSource, optional):

The name of the Kubernetes Secret where the postgres password can be retrieved. Will be mounted and supplied as an environment variable to the Job Pod.Secret must contain the key "postgresql-password" which will be exposed in the Job environment as the environment variable DAGSTER_PG_PASSWORD.

dagster_home (dagster.StringSource, optional):

The location of DAGSTER_HOME in the Job container; this is where the dagster.yaml file will be mounted from the instance ConfigMap specified here. Defaults to /opt/dagster/dagster_home.

Default Value: ‘/opt/dagster/dagster_home’

load_incluster_config (Bool, optional):

Set this value if you are running the launcher within a k8s cluster. If True, we assume the launcher is running within the target cluster and load config using kubernetes.config.load_incluster_config. Otherwise, we will use the k8s config specified in kubeconfig_file (using kubernetes.config.load_kube_config) or fall back to the default kubeconfig.

Default Value: True

kubeconfig_file (Union[String, None], optional):

The kubeconfig file from which to load config. Defaults to using the default kubeconfig.

Default Value: None

fail_pod_on_run_failure (Bool, optional):

Whether the launched Kubernetes Jobs and Pods should fail if the Dagster run fails

broker (Union[dagster.StringSource, None], optional):

The URL of the Celery broker. Default: ‘pyamqp://guest@{os.getenv(‘DAGSTER_CELERY_BROKER_HOST’,’localhost’)}//’.

backend (Union[dagster.StringSource, None], optional):

The URL of the Celery results backend. Default: ‘rpc://’.

Default Value: ‘rpc://’

include (List[String], optional):

List of modules every worker should import

config_source (Union[permissive dict, None], optional):

Additional settings for the Celery app.

retries (selector, optional):

Whether retries are enabled or not. By default, retries are enabled.

Default Value:
{
    "enabled": {}
}
Config Schema:
enabled (strict dict, optional):
Default Value:
{}
disabled (strict dict, optional):
Default Value:
{}

In contrast to the K8sRunLauncher, which launches dagster runs as single K8s Jobs, this run launcher is intended for use in concert with dagster_celery_k8s.celery_k8s_job_executor().

With this run launcher, execution is delegated to:

  1. A run worker Kubernetes Job, which traverses the dagster run execution plan and submits steps to Celery queues for execution;

  2. The step executions which are submitted to Celery queues are picked up by Celery workers, and each step execution spawns a step execution Kubernetes Job. See the implementation defined in dagster_celery_k8.executor.create_k8s_job_task().

You can configure a Dagster instance to use this RunLauncher by adding a section to your dagster.yaml like the following:

run_launcher:
  module: dagster_k8s.launcher
  class: CeleryK8sRunLauncher
  config:
    instance_config_map: "dagster-k8s-instance-config-map"
    dagster_home: "/some/path"
    postgres_password_secret: "dagster-k8s-pg-password"
    broker: "some_celery_broker_url"
    backend: "some_celery_backend_url"
dagster_celery_k8s.celery_k8s_job_executor ExecutorDefinition[source]

Config Schema:
broker (Union[dagster.StringSource, None], optional):

The URL of the Celery broker. Default: ‘pyamqp://guest@{os.getenv(‘DAGSTER_CELERY_BROKER_HOST’,’localhost’)}//’.

backend (Union[dagster.StringSource, None], optional):

The URL of the Celery results backend. Default: ‘rpc://’.

Default Value: ‘rpc://’

include (List[String], optional):

List of modules every worker should import

config_source (Union[permissive dict, None], optional):

Additional settings for the Celery app.

retries (selector, optional):

Whether retries are enabled or not. By default, retries are enabled.

Default Value:
{
    "enabled": {}
}
Config Schema:
enabled (strict dict, optional):
Default Value:
{}
disabled (strict dict, optional):
Default Value:
{}
job_image (Union[dagster.StringSource, None], optional):

Docker image to use for launched Jobs. If this field is empty, the image that was used to originally load the Dagster repository will be used. (Ex: “mycompany.com/dagster-k8s-image:latest”).

image_pull_policy (Union[dagster.StringSource, None], optional):

Image pull policy to set on launched Pods.

image_pull_secrets (Union[List[strict dict], None], optional):

Specifies that Kubernetes should get the credentials from the Secrets named in this list.

service_account_name (Union[dagster.StringSource, None], optional):

The name of the Kubernetes service account under which to run.

env_config_maps (Union[List[dagster.StringSource], None], optional):

A list of custom ConfigMapEnvSource names from which to draw environment variables (using envFrom) for the Job. Default: []. See:https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container

env_secrets (Union[List[dagster.StringSource], None], optional):

A list of custom Secret names from which to draw environment variables (using envFrom) for the Job. Default: []. See:https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables

env_vars (Union[List[String], None], optional):

A list of environment variables to inject into the Job. Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the current process). Default: []. See: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables

volume_mounts (List[permissive dict], optional):

A list of volume mounts to include in the job’s container. Default: []. See: https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volumemount-v1-core

Default Value: []

volumes (List[permissive dict], optional):

A list of volumes to include in the Job’s Pod. Default: []. For the many possible volume source types that can be included, see: https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volume-v1-core

Default Value: []

labels (permissive dict, optional):

Labels to apply to all created pods. See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels

resources (Union[strict dict, None], optional):

Compute resource requirements for the container. See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/

scheduler_name (Union[dagster.StringSource, None], optional):

Use a custom Kubernetes scheduler for launched Pods. See:https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/

load_incluster_config (Bool, optional):

Set this value if you are running the launcher within a k8s cluster. If True, we assume the launcher is running within the target cluster and load config using kubernetes.config.load_incluster_config. Otherwise, we will use the k8s config specified in kubeconfig_file (using kubernetes.config.load_kube_config) or fall back to the default kubeconfig. Default: True.

Default Value: True

kubeconfig_file (Union[String, None], optional):

Path to a kubeconfig file to use, if not using default kubeconfig.

job_namespace (dagster.StringSource, optional):

The namespace into which to launch new jobs. Note that any other Kubernetes resources the Job requires (such as the service account) must be present in this namespace. Default: "default"

Default Value: ‘default’

repo_location_name (dagster.StringSource, optional):

The repository location name to use for execution.

Default Value: ‘<<in_process>>’

job_wait_timeout (Float, optional):

Wait this many seconds for a job to complete before marking the run as failed. Defaults to 86400.0 seconds.

Default Value: 86400.0

Celery-based executor which launches tasks as Kubernetes Jobs.

The Celery executor exposes config settings for the underlying Celery app under the config_source key. This config corresponds to the “new lowercase settings” introduced in Celery version 4.0 and the object constructed from config will be passed to the celery.Celery constructor as its config_source argument. (See https://docs.celeryq.dev/en/stable/userguide/configuration.html for details.)

The executor also exposes the broker, backend, and include arguments to the celery.Celery constructor.

In the most common case, you may want to modify the broker and backend (e.g., to use Redis instead of RabbitMQ). We expect that config_source will be less frequently modified, but that when op executions are especially fast or slow, or when there are different requirements around idempotence or retry, it may make sense to execute dagster jobs with variations on these settings.

To use the celery_k8s_job_executor, set it as the executor_def when defining a job:

from dagster_celery_k8s.executor import celery_k8s_job_executor

from dagster import job


@job(executor_def=celery_k8s_job_executor)
def celery_enabled_job():
    pass

Then you can configure the executor as follows:

execution:
  config:
    job_image: 'my_repo.com/image_name:latest'
    job_namespace: 'some-namespace'
    broker: 'pyamqp://guest@localhost//'  # Optional[str]: The URL of the Celery broker
    backend: 'rpc://' # Optional[str]: The URL of the Celery results backend
    include: ['my_module'] # Optional[List[str]]: Modules every worker should import
    config_source: # Dict[str, Any]: Any additional parameters to pass to the
        #...       # Celery workers. This dict will be passed as the `config_source`
        #...       # argument of celery.Celery().

Note that the YAML you provide here must align with the configuration with which the Celery workers on which you hope to run were started. If, for example, you point the executor at a different broker than the one your workers are listening to, the workers will never be able to pick up tasks for execution.

In deployments where the celery_k8s_job_executor is used all appropriate celery and dagster_celery commands must be invoked with the -A dagster_celery_k8s.app argument.