chart/templates/dag-processor/dag-processor-deployment.yaml (272 lines of code) (raw):
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}
################################
## Airflow Dag Processor Deployment
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if $enabled }}
{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }}
{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }}
{{- $topologySpreadConstraints := or .Values.dagProcessor.topologySpreadConstraints .Values.topologySpreadConstraints }}
{{- $revisionHistoryLimit := or .Values.dagProcessor.revisionHistoryLimit .Values.revisionHistoryLimit }}
{{- $securityContext := include "airflowPodSecurityContext" (list . .Values.dagProcessor) }}
{{- $containerSecurityContext := include "containerSecurityContext" (list . .Values.dagProcessor) }}
{{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list . .Values.dagProcessor.logGroomerSidecar) }}
{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list . .Values.dagProcessor.waitForMigrations) }}
{{- $containerLifecycleHooks := or .Values.dagProcessor.containerLifecycleHooks .Values.containerLifecycleHooks }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "airflow.fullname" . }}-dag-processor
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- with .Values.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
{{- if .Values.dagProcessor.annotations }}
annotations: {{- toYaml .Values.dagProcessor.annotations | nindent 4 }}
{{- end }}
spec:
replicas: {{ .Values.dagProcessor.replicas }}
{{- if $revisionHistoryLimit }}
revisionHistoryLimit: {{ $revisionHistoryLimit }}
{{- end }}
selector:
matchLabels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- if .Values.dagProcessor.strategy }}
strategy: {{- toYaml .Values.dagProcessor.strategy | nindent 4 }}
{{- end }}
template:
metadata:
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- with .Values.labels }}
{{- toYaml . | nindent 8 }}
{{- end }}
annotations:
checksum/metadata-secret: {{ include (print $.Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }}
checksum/pgbouncer-config-secret: {{ include (print $.Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }}
checksum/airflow-config: {{ include (print $.Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }}
checksum/extra-configmaps: {{ include (print $.Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }}
checksum/extra-secrets: {{ include (print $.Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }}
{{- if .Values.dagProcessor.safeToEvict }}
cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
{{- end }}
{{- if .Values.airflowPodAnnotations }}
{{- toYaml .Values.airflowPodAnnotations | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.podAnnotations }}
{{- toYaml .Values.dagProcessor.podAnnotations | nindent 8 }}
{{- end }}
spec:
{{- if .Values.dagProcessor.priorityClassName }}
priorityClassName: {{ .Values.dagProcessor.priorityClassName }}
{{- end }}
nodeSelector: {{- toYaml $nodeSelector | nindent 8 }}
affinity:
{{- if $affinity }}
{{- toYaml $affinity | nindent 8 }}
{{- else }}
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
component: dag-processor
topologyKey: kubernetes.io/hostname
weight: 100
{{- end }}
{{- if .Values.schedulerName }}
schedulerName: {{ .Values.schedulerName }}
{{- end }}
tolerations: {{- toYaml $tolerations | nindent 8 }}
topologySpreadConstraints: {{- toYaml $topologySpreadConstraints | nindent 8 }}
terminationGracePeriodSeconds: {{ .Values.dagProcessor.terminationGracePeriodSeconds }}
restartPolicy: Always
serviceAccountName: {{ include "dagProcessor.serviceAccountName" . }}
securityContext: {{ $securityContext | nindent 8 }}
{{- if or .Values.registry.secretName .Values.registry.connection }}
imagePullSecrets:
- name: {{ template "registry_secret" . }}
{{- end }}
initContainers:
{{- if .Values.dagProcessor.waitForMigrations.enabled }}
- name: wait-for-airflow-migrations
resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }}
image: {{ template "airflow_image_for_migrations" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
securityContext: {{ $containerSecurityContextWaitForMigrations | nindent 12 }}
volumeMounts:
{{- if .Values.volumeMounts }}
{{- toYaml .Values.volumeMounts | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.extraVolumeMounts }}
{{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }}
{{- end }}
{{- include "airflow_config_mount" . | nindent 12 }}
args: {{- include "wait-for-migrations-command" . | indent 10 }}
envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }}
env:
{{- include "custom_airflow_environment" . | indent 10 }}
{{- include "standard_airflow_environment" . | indent 10 }}
{{- if .Values.dagProcessor.waitForMigrations.env }}
{{- tpl (toYaml .Values.dagProcessor.waitForMigrations.env) $ | nindent 12 }}
{{- end }}
{{- end }}
{{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
{{- include "git_sync_container" (dict "Values" .Values "is_init" "true" "Template" .Template) | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.extraInitContainers }}
{{- tpl (toYaml .Values.dagProcessor.extraInitContainers) . | nindent 8 }}
{{- end }}
containers:
- name: dag-processor
image: {{ template "airflow_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
securityContext: {{ $containerSecurityContext | nindent 12 }}
{{- if $containerLifecycleHooks }}
lifecycle: {{- tpl (toYaml $containerLifecycleHooks) . | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.command }}
command: {{ tpl (toYaml .Values.dagProcessor.command) . | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.args }}
args: {{ tpl (toYaml .Values.dagProcessor.args) . | nindent 12 }}
{{- end }}
resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }}
volumeMounts:
{{- if .Values.volumeMounts }}
{{- toYaml .Values.volumeMounts | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.extraVolumeMounts }}
{{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }}
{{- end }}
- name: logs
mountPath: {{ template "airflow_logs" . }}
{{- include "airflow_config_mount" . | nindent 12 }}
{{- if or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled }}
{{- include "airflow_dags_mount" . | nindent 12 }}
{{- end }}
envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }}
env:
{{- include "custom_airflow_environment" . | indent 10 }}
{{- include "standard_airflow_environment" . | indent 10 }}
{{- include "container_extra_envs" (list . .Values.dagProcessor.env) | indent 10 }}
livenessProbe:
initialDelaySeconds: {{ .Values.dagProcessor.livenessProbe.initialDelaySeconds }}
timeoutSeconds: {{ .Values.dagProcessor.livenessProbe.timeoutSeconds }}
failureThreshold: {{ .Values.dagProcessor.livenessProbe.failureThreshold }}
periodSeconds: {{ .Values.dagProcessor.livenessProbe.periodSeconds }}
exec:
command:
{{- if .Values.dagProcessor.livenessProbe.command }}
{{- toYaml .Values.dagProcessor.livenessProbe.command | nindent 16 }}
{{- else }}
{{- include "dag_processor_liveness_check_command" . | indent 14 }}
{{- end }}
{{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
{{- include "git_sync_container" . | indent 8 }}
{{- end }}
{{- if .Values.dagProcessor.logGroomerSidecar.enabled }}
- name: dag-processor-log-groomer
resources: {{- toYaml .Values.dagProcessor.logGroomerSidecar.resources | nindent 12 }}
image: {{ template "airflow_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
securityContext: {{ $containerSecurityContextLogGroomerSidecar | nindent 12 }}
{{- if .Values.dagProcessor.logGroomerSidecar.command }}
command: {{ tpl (toYaml .Values.dagProcessor.logGroomerSidecar.command) . | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.logGroomerSidecar.args }}
args: {{- tpl (toYaml .Values.dagProcessor.logGroomerSidecar.args) . | nindent 12 }}
{{- end }}
env:
{{- if .Values.dagProcessor.logGroomerSidecar.retentionDays }}
- name: AIRFLOW__LOG_RETENTION_DAYS
value: "{{ .Values.dagProcessor.logGroomerSidecar.retentionDays }}"
{{- end }}
{{- if .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }}
- name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES
value: "{{ .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }}"
{{- end }}
- name: AIRFLOW_HOME
value: "{{ .Values.airflowHome }}"
{{- if .Values.dagProcessor.logGroomerSidecar.env }}
{{- tpl (toYaml .Values.dagProcessor.logGroomerSidecar.env) $ | nindent 12 }}
{{- end }}
volumeMounts:
- name: logs
mountPath: {{ template "airflow_logs" . }}
{{- if .Values.volumeMounts }}
{{- toYaml .Values.volumeMounts | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.extraVolumeMounts }}
{{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }}
{{- end }}
{{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }}
{{- include "airflow_webserver_config_mount" . | nindent 12 }}
{{- end }}
{{- end }}
{{- if .Values.dagProcessor.extraContainers }}
{{- tpl (toYaml .Values.dagProcessor.extraContainers) . | nindent 8 }}
{{- end }}
volumes:
- name: config
configMap:
name: {{ template "airflow_config" . }}
{{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }}
- name: webserver-config
configMap:
name: {{ template "airflow_webserver_config_configmap_name" . }}
{{- end }}
{{- if .Values.dags.persistence.enabled }}
- name: dags
persistentVolumeClaim:
claimName: {{ template "airflow_dags_volume_claim" . }}
{{- else if .Values.dags.gitSync.enabled }}
- name: dags
emptyDir: {{- toYaml (default (dict) .Values.dags.gitSync.emptyDirConfig) | nindent 12 }}
{{- end }}
{{- if and .Values.dags.gitSync.enabled (or .Values.dags.gitSync.sshKeySecret .Values.dags.gitSync.sshKey) }}
{{- include "git_sync_ssh_key_volume" . | indent 8 }}
{{- end }}
{{- if .Values.volumes }}
{{- toYaml .Values.volumes | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.extraVolumes }}
{{- tpl (toYaml .Values.dagProcessor.extraVolumes) . | nindent 8 }}
{{- end }}
{{- if .Values.logs.persistence.enabled }}
- name: logs
persistentVolumeClaim:
claimName: {{ template "airflow_logs_volume_claim" . }}
{{- else }}
- name: logs
emptyDir: {{- toYaml (default (dict) .Values.logs.emptyDirConfig) | nindent 12 }}
{{- end }}
{{- end }}
{{- end }}