Kubernetes is a container orchestration system for automating deployment, scaling, and management of containerized applications. Dagster uses Kubernetes in combination with Helm, a package manager for Kubernetes applications. Using Helm, users specify the configuration of required Kubernetes resources to deploy Dagster through a values file or command-line overrides. References to values.yaml in the following sections refer to Dagster's values.yaml.
Dagster publishes a fully-featured Helm chart to manage installing and running a production-grade Kubernetes deployment of Dagster. For each Dagster component in the chart, Dagster publishes a corresponding Docker image on DockerHub.
kubectl should be configured with your desired Kubernetes cluster. You should understand the basics of Helm, and Helm 3 should be installed. If you are creating your own user code images, Docker should be installed as well.
The Dagster Helm chart is versioned with the same version numbers as the Dagster Python library, and ideally should only be used together when the version numbers match.
In the following tutorial, we install the most recent version of the Dagster Helm chart. To use an older version of the Chart, a --version flag can be passed to helm upgrade. If you are using a chart version before 0.11.13, you will also need to update the tags of the Dagster provided images to match the Chart version. After 0.11.13, this will automatically be done for you.
The daemon periodically checks the Runs table in PostgreSQL for runs that are ready to be launched. The daemon also submits runs from schedules and sensors.
The daemon launches runs via the K8sRunLauncher, creating a run worker Job with the image specified in the user code deployment.
The Dagit webserver communicates with the user code deployments via gRPC to fetch information needed to populate the Dagit UI. Dagit does not load or execute user-written code, and will remain available even when user code contains errors. Dagit frequently checks whether the user code deployment has been updated; and if so, the new information is fetched.
Dagit can be horizontally scaled by setting the dagit.replicaCount field in the values.yaml.
By default, dagit launches runs via the K8sRunLauncher, which creates a new Kubernetes Job per run.
The user can connect an external database (i.e. using a cloud provider's managed database service, like RDS) or run PostgreSQL on Kubernetes. This database stores runs event logs, and other metadata, and powers much of the real-time and historical data visible in Dagit. In order to maintain a referenceable history of events, we recommend connecting an external database for most use cases.
The run worker is responsible for executing launched Dagster runs. The run worker uses the same image as the user code deployment at the time the run was submitted. The run worker uses ephemeral compute, and completes once the run is finished. Events that occur during the run are written to the database, and are displayed in Dagit.
The run worker jobs and pods are not automatically deleted, so that users are able to inspect results. It is up to the user to periodically delete old jobs and pods.
Each Dagster job specifies an executor that determines how the run worker will execute each step of the job. Different executors offer different levels of isolation and concurrency. Common choices are in_process_executor (all steps run serially in a single process in a single pod), multiprocess_executor (multiple processes in a single pod), and k8s_job_executor (each step runs in a separate Kubernetes job). Generally, increasing isolation incurs some additional overhead per step (e.g. starting up a new Kubernetes job vs starting a new process within a pod). The executor can be configured per-run in the execution block.
A user code deployment runs a gRPC server and responds to Dagit's requests for information (such as: "List all of the jobs in each repository" or "What is the dependency structure of job X?"). The user-provided image for the user code deployment must contain a repository definition and all of the packages needed to execute within the repository.
Users can have multiple user code deployments. A common pattern is for each user code deployment to correspond to a different repository.
User code deployments can be updated independently from other Dagster components, including Dagit. As a result, updates to repositories can occur without causing downtime to any other repository or to Dagit. After updating, if there is an error with any repository, an error is surfaced for that repository within Dagit; all other repositories and Dagit will still operate normally.
Build a Docker image containing your Dagster repository and any dependencies needed to execute the business logic in your code.
For reference, here is an example Dockerfile and the corresponding user code directory. Here, we install all the Dagster-related dependencies in the Dockerfile, and then copy over the directory with the implementation of the Dagster repository into the root folder. We'll need to remember the path of this repository in a subsequent step to setup the gRPC server as a deployment.
The example user code repository includes a step_isolated_job job that uses the k8s_job_executor to run each op in its own pod, and a single_pod_job job that runs all ops in a single pod.
For projects with many dependencies, it is recommended that you publish your Python project as a package and install that package in your Dockerfile.
To run these jobs, you'll need an AWS S3 bucket available, and access to a pair of AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY values. This is because the IO Manager uses boto.
This tutorial also has the option of using minio to mock an S3 endpoint locally in K8s. Note that this option utilizes host.docker.internal to access a host from within Docker - this behavior has only been tested for MacOS, so may need different configuration for other platforms.
Skip this step if you'd like to use minio for a local S3 endpoint
If using S3, create a bucket in your AWS account -- for this tutorial, we'll create a bucket called test-bucket. Also, keep your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY credentials handy. Now, you can create your k8s secrets:
brew install minio/stable/minio # server
brew install minio/stable/mc # clientmkdir$HOME/miniodata # Prepare a directory for data
minio server $HOME/miniodata # start a server with default user/pass and no TLSmc --insecure aliasset minio http://localhost:9000 minioadmin minioadmin
# See it workmcls minio
date> date1.txt # create a sample filemccp date1.txt minio://testbucket/date1.txt
exportAWS_ACCESS_KEY_ID="minioadmin"exportAWS_SECRET_ACCESS_KEY="minioadmin"# See the aws cli work
aws --endpoint-url http://localhost:9000 s3 mb s3://test-bucket
aws --endpoint-url http://localhost:9000 s3 cp date1.txt s3://test-bucket/
The Dagster chart repository contains the versioned charts for all Dagster releases. Add the remote url under the namespace dagster to install the Dagster charts.
Update the dagster-user-deployments.deployments section of the Dagster chart's values.yaml to include your deployment. Here, we can specify the configuration of the Kubernetes Deployment that will create the gRPC server for Dagit and the daemon to access the user code. The gRPC server is created through the arguments passed to dagsterApiGrpcArgs, which expects a list of arguments for dagster api grpc.
To get access to the Dagster values.yaml, run:
$ helm show values dagster/dagster > values.yaml
The following snippet works for Dagster's example user code image. Since our Dockerfile contains the repository definition in a path, we specify arguments for the gRPC server to find this path under dagsterApiGrpcArgs. Note that if you haven't set up an S3 endpoint, you can only run the job called single_pod_job.
By default, this configuration will also be included in the pods for any runs that are launched for the user code deployment. You can disable this behavior for a user code deployment by setting includeConfigInLaunchedRuns.enabled to false for that deployment.
You'll need a slightly different configuration to run the job called step_isolated_job. This is because step_isolated_job uses an s3_pickle_io_manager, so you'll need to provide the user code k8s pods with AWS S3 credentials.
See the set up S3 section for setup instructions. The below snippet works for both AWS S3 and a local S3 endpoint via minio.
Install the Helm chart and create a release. Below, we've named our release dagster. We use helm upgrade --install to create the release if it does not exist; otherwise, the existing dagster release will be modified:
Helm will launch several pods including PostgreSQL. You can check the status of the installation with kubectl - note that it might take a few minutes for the pods to move to a Running state.
If everything worked correctly, you should see output like the following:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
dagster-dagit-645b7d59f8-6lwxh 1/1 Running 0 11m
dagster-k8s-example-user-code-1-88764b4f4-ds7tn 1/1 Running 0 9m24s
dagster-postgresql-0 1/1 Running 0 17m
You can introspect the jobs that were launched with kubectl:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-run-5ee8a0b3-7ca5-44e6-97a6-8f4bd86ee630 1/1 4s 11s
Now, you can try a run with step isolation. Switch to the step_isolated_job job, changing the default config to point to your S3 bucket if needed, and launch the run.
If you're using minio, change your config to look like this:
resources:io_manager:config:s3_bucket:"test-bucket"s3:config:# This use of host.docker.internal is unique to Macendpoint_url: http://host.docker.internal:9000region_name: us-east-1ops:multiply_the_word:config:factor:0inputs:word:""
Again, you can view the launched jobs:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-run-5ee8a0b3-7ca5-44e6-97a6-8f4bd86ee630 1/1 4s 11s
dagster-run-733baf75-fab2-4366-9542-0172fa4ebc1f 1/1 4s 100s
Some of the following commands will be useful if you'd like to debug issues with deploying on Helm:
# Get the Dagit pod's name
$ export DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" \
-o jsonpath="{.items[0].metadata.name}")
# Start a shell in the dagit pod
$ kubectl exec --stdin --tty $DAGIT_POD_NAME -- /bin/bash
# Get debug data from $RUN_ID
$ kubectl exec $DAGIT_POD_NAME -- dagster debug export $RUN_ID debug_info.gzip
# Get a list of recently failed runs
$ kubectl exec $DAGIT_POD -- dagster debug export fakename fakename.gzip
# Get debug output of a failed run - note that this information is also available in Dagit
$ kubectl exec $DAGIT_POD -- dagster debug export 360d7882-e631-4ac7-8632-43c75cb4d426 debug.gzip
# Extract the debug.gzip from the pod
$ kubectl cp $DAGIT_POD:debug.gzip debug.gzip
# List config maps
$ kubectl get configmap # Make note of the "user-deployments" configmap
$ kubectl get configmap dagster-dagster-user-deployments-$NAME