charts/pulsar/templates/broker-statefulset.yaml (342 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # {{- if .Values.components.broker }} apiVersion: apps/v1 kind: StatefulSet metadata: {{- $stsName := printf "%s-%s" (include "pulsar.fullname" .) .Values.broker.component }} name: {{ $stsName | quote }} {{- $namespace := include "pulsar.namespace" . }} namespace: {{ $namespace | quote }} annotations: {{ .Values.broker.appAnnotations | toYaml | nindent 4 }} labels: {{- include "pulsar.standardLabels" . | nindent 4 }} component: {{ .Values.broker.component }} spec: serviceName: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}" {{- if not .Values.broker.autoscaling.enabled }} replicas: {{ .Values.broker.replicaCount }} {{- end }} selector: matchLabels: {{- include "pulsar.matchLabels" . | nindent 6 }} component: {{ .Values.broker.component }} updateStrategy: type: RollingUpdate {{- /* When functions are enabled, podManagementPolicy must be OrderedReady to ensure that other started brokers are available via DNS for the function worker to connect to. Since podManagementPolicy is immutable, this rule is only applied when the broker is first installed. */}} {{- $stsObj := lookup "apps/v1" "StatefulSet" $namespace $stsName }} {{- if $stsObj }} podManagementPolicy: {{ $stsObj.spec.podManagementPolicy }} {{- else }} {{- if .Values.broker.podManagementPolicy }} podManagementPolicy: {{ .Values.broker.podManagementPolicy }} {{- else if not .Values.components.functions }} podManagementPolicy: Parallel {{- else }} podManagementPolicy: OrderedReady {{- end }} {{- end }} template: metadata: labels: {{- include "pulsar.template.labels" . | nindent 8 }} component: {{ .Values.broker.component }} annotations: {{- if not .Values.broker.podMonitor.enabled }} prometheus.io/scrape: "true" prometheus.io/port: "{{ .Values.broker.ports.http }}" {{- end }} {{- if .Values.broker.restartPodsOnConfigMapChange }} checksum/config: {{ include (print $.Template.BasePath "/broker-configmap.yaml") . | sha256sum }} {{- end }} {{- with .Values.broker.annotations }} {{ toYaml . | indent 8 }} {{- end }} spec: serviceAccountName: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}-acct" {{- if .Values.broker.nodeSelector }} nodeSelector: {{ toYaml .Values.broker.nodeSelector | indent 8 }} {{- end }} {{- if .Values.broker.tolerations }} tolerations: {{ toYaml .Values.broker.tolerations | indent 8 }} {{- end }} {{- if .Values.broker.topologySpreadConstraints }} topologySpreadConstraints: {{- toYaml .Values.broker.topologySpreadConstraints | nindent 8 }} {{- end }} affinity: {{- if and .Values.affinity.anti_affinity .Values.broker.affinity.anti_affinity}} podAntiAffinity: {{- if eq .Values.broker.affinity.type "requiredDuringSchedulingIgnoredDuringExecution"}} {{ .Values.broker.affinity.type }}: - labelSelector: matchExpressions: - key: "app" operator: In values: - "{{ template "pulsar.name" . }}" - key: "release" operator: In values: - {{ .Release.Name }} - key: "component" operator: In values: - {{ .Values.broker.component }} topologyKey: {{ .Values.broker.affinity.anti_affinity_topology_key }} {{- else }} {{ .Values.broker.affinity.type }}: - weight: 100 podAffinityTerm: labelSelector: matchExpressions: - key: "app" operator: In values: - "{{ template "pulsar.name" . }}" - key: "release" operator: In values: - {{ .Release.Name }} - key: "component" operator: In values: - {{ .Values.broker.component }} topologyKey: {{ .Values.broker.affinity.anti_affinity_topology_key }} {{- end }} {{- end }} terminationGracePeriodSeconds: {{ .Values.broker.gracePeriod }} initContainers: {{- if and .Values.components.zookeeper .Values.broker.waitZookeeperTimeout (gt (.Values.broker.waitZookeeperTimeout | int) 0) }} # This init container will wait for zookeeper to be ready before # deploying the bookies - name: wait-zookeeper-ready image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}" imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}" resources: {{ toYaml .Values.initContainer.resources | nindent 10 }} command: ["timeout", "{{ .Values.broker.waitZookeeperTimeout }}", "sh", "-c"] args: - | {{- include "pulsar.broker.zookeeper.tls.settings" . | nindent 12 }} export PULSAR_MEM="-Xmx128M"; {{- if .Values.pulsar_metadata.configurationStore }} until timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.configurationStore.connect" . }} get {{ .Values.configurationStoreMetadataPrefix }}/admin/clusters/{{ template "pulsar.cluster.name" . }}; do {{- end }} {{- if not .Values.pulsar_metadata.configurationStore }} until timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.zookeeper.connect" . }} get {{ .Values.metadataPrefix }}/admin/clusters/{{ template "pulsar.cluster.name" . }}; do {{- end }} echo "pulsar cluster {{ template "pulsar.cluster.name" . }} isn't initialized yet ... check in 3 seconds ..." && sleep 3; done; volumeMounts: {{- include "pulsar.broker.certs.volumeMounts" . | nindent 8 }} {{- end }} {{- if and .Values.components.oxia .Values.broker.waitOxiaTimeout (gt (.Values.broker.waitOxiaTimeout | int) 0) }} - name: wait-oxia-ready image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}" imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}" resources: {{ toYaml .Values.initContainer.resources | nindent 10 }} command: ["timeout", "{{ .Values.broker.waitOxiaTimeout }}", "sh", "-c"] args: - | until nslookup {{ template "pulsar.oxia.server.service" . }}; do sleep 3; done; {{- end }} {{- if and .Values.broker.waitBookkeeperTimeout (gt (.Values.broker.waitBookkeeperTimeout | int) 0) }} # This init container will wait for bookkeeper to be ready before # deploying the broker - name: wait-bookkeeper-ready image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}" imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}" resources: {{ toYaml .Values.initContainer.resources | nindent 10 }} command: ["timeout", "{{ .Values.broker.waitBookkeeperTimeout }}", "sh", "-c"] args: - | {{- include "pulsar.broker.zookeeper.tls.settings" . | nindent 12 }} bin/apply-config-from-env.py conf/bookkeeper.conf; export BOOKIE_MEM="-Xmx128M"; until timeout 15 bin/bookkeeper shell whatisinstanceid; do echo "bookkeeper cluster is not initialized yet. backoff for 3 seconds ..."; sleep 3; done; echo "bookkeeper cluster is already initialized"; bookieServiceNumber="$(nslookup -timeout=10 {{ template "pulsar.fullname" . }}-{{ .Values.bookkeeper.component }} | grep Name | wc -l)"; until [ ${bookieServiceNumber} -ge {{ .Values.broker.configData.managedLedgerDefaultEnsembleSize }} ]; do echo "bookkeeper cluster {{ template "pulsar.cluster.name" . }} isn't ready yet ... check in 10 seconds ..."; sleep 10; bookieServiceNumber="$(nslookup -timeout=10 {{ template "pulsar.fullname" . }}-{{ .Values.bookkeeper.component }} | grep Name | wc -l)"; done; echo "bookkeeper cluster is ready"; envFrom: - configMapRef: name: "{{ template "pulsar.fullname" . }}-{{ .Values.bookkeeper.component }}" volumeMounts: {{- include "pulsar.broker.certs.volumeMounts" . | nindent 10 }} {{- end }} {{- if .Values.broker.initContainers }} {{- toYaml .Values.broker.initContainers | nindent 6 }} {{- end }} containers: - name: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}" image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}" imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}" {{- if .Values.broker.probe.liveness.enabled }} livenessProbe: httpGet: path: /status.html port: {{ .Values.broker.ports.http }} initialDelaySeconds: {{ .Values.broker.probe.liveness.initialDelaySeconds }} periodSeconds: {{ .Values.broker.probe.liveness.periodSeconds }} timeoutSeconds: {{ .Values.broker.probe.liveness.timeoutSeconds }} failureThreshold: {{ .Values.broker.probe.liveness.failureThreshold }} {{- end }} {{- if .Values.broker.probe.readiness.enabled }} readinessProbe: httpGet: path: /status.html port: {{ .Values.broker.ports.http }} initialDelaySeconds: {{ .Values.broker.probe.readiness.initialDelaySeconds }} periodSeconds: {{ .Values.broker.probe.readiness.periodSeconds }} timeoutSeconds: {{ .Values.broker.probe.readiness.timeoutSeconds }} failureThreshold: {{ .Values.broker.probe.readiness.failureThreshold }} {{- end }} {{- if .Values.broker.probe.startup.enabled }} startupProbe: httpGet: path: /status.html port: {{ .Values.broker.ports.http }} initialDelaySeconds: {{ .Values.broker.probe.startup.initialDelaySeconds }} periodSeconds: {{ .Values.broker.probe.startup.periodSeconds }} timeoutSeconds: {{ .Values.broker.probe.startup.timeoutSeconds }} failureThreshold: {{ .Values.broker.probe.startup.failureThreshold }} {{- end }} {{- if .Values.broker.resources }} resources: {{ toYaml .Values.broker.resources | indent 10 }} {{- end }} command: ["sh", "-c"] args: - | {{- if .Values.broker.additionalCommand }} {{ .Values.broker.additionalCommand }} {{- end }} bin/apply-config-from-env.py conf/broker.conf; bin/gen-yml-from-env.py conf/functions_worker.yml; echo "OK" > "${statusFilePath:-status}"; {{- if .Values.components.zookeeper }} {{- include "pulsar.broker.zookeeper.tls.settings" . | nindent 10 }} timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.zookeeper.connect" . }} get {{ template "pulsar.broker.znode" . }}; while [ $? -eq 0 ]; do echo "broker {{ template "pulsar.broker.hostname" . }} znode still exists ... check in 10 seconds ..."; sleep 10; timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.zookeeper.connect" . }} get {{ template "pulsar.broker.znode" . }}; done; {{- end }} cat conf/pulsar_env.sh; OPTS="${OPTS} -Dlog4j2.formatMsgNoLookups=true" exec bin/pulsar broker; ports: # prometheus needs to access /metrics endpoint - name: http containerPort: {{ .Values.broker.ports.http }} {{- if or (not .Values.tls.enabled) (not .Values.tls.broker.enabled) }} - name: "{{ .Values.tcpPrefix }}pulsar" containerPort: {{ .Values.broker.ports.pulsar }} {{- end }} {{- if and .Values.tls.enabled .Values.tls.broker.enabled }} - name: https containerPort: {{ .Values.broker.ports.https }} - name: "{{ .Values.tlsPrefix }}pulsarssl" containerPort: {{ .Values.broker.ports.pulsarssl }} {{- end }} envFrom: - configMapRef: name: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}" volumeMounts: {{- if .Values.auth.authentication.enabled }} {{- if eq .Values.auth.authentication.provider "jwt" }} - mountPath: "/pulsar/keys" name: token-keys readOnly: true - mountPath: "/pulsar/tokens" name: broker-token readOnly: true {{- end }} {{- end }} {{- if .Values.broker.storageOffload.driver }} {{- if eq .Values.broker.storageOffload.driver "google-cloud-storage" }} - name: gcp-service-account readOnly: true mountPath: /pulsar/gcp-service-account {{- end }} {{- end }} {{- if .Values.broker.extraVolumeMounts }} {{ toYaml .Values.broker.extraVolumeMounts | indent 10 }} {{- end }} {{- include "pulsar.broker.certs.volumeMounts" . | nindent 10 }} env: {{- if and (and .Values.broker.storageOffload (eq .Values.broker.storageOffload.driver "aws-s3")) .Values.broker.storageOffload.secret }} - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: {{ .Values.broker.storageOffload.secret }} key: AWS_ACCESS_KEY_ID - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: {{ .Values.broker.storageOffload.secret }} key: AWS_SECRET_ACCESS_KEY {{- end }} {{- if and .Values.broker.storageOffload (eq .Values.broker.storageOffload.driver "azureblob") }} - name: AZURE_STORAGE_ACCOUNT valueFrom: secretKeyRef: name: {{ .Values.broker.storageOffload.secret }} key: AZURE_STORAGE_ACCOUNT - name: AZURE_STORAGE_ACCESS_KEY valueFrom: secretKeyRef: name: {{ .Values.broker.storageOffload.secret }} key: AZURE_STORAGE_ACCESS_KEY {{- end }} {{- if .Values.broker.extraEnvs }} {{- toYaml .Values.broker.extraEnvs | nindent 10 }} {{- end }} volumes: {{- if .Values.broker.extraVolumes }} {{ toYaml .Values.broker.extraVolumes | indent 6 }} {{- end }} {{- if .Values.auth.authentication.enabled }} {{- if eq .Values.auth.authentication.provider "jwt" }} - name: token-keys secret: {{- if not .Values.auth.authentication.jwt.usingSecretKey }} secretName: "{{ .Release.Name }}-token-asymmetric-key" {{- end}} {{- if .Values.auth.authentication.jwt.usingSecretKey }} secretName: "{{ .Release.Name }}-token-symmetric-key" {{- end}} items: {{- if .Values.auth.authentication.jwt.usingSecretKey }} - key: SECRETKEY path: token/secret.key {{- else }} - key: PUBLICKEY path: token/public.key {{- end}} - name: broker-token secret: secretName: "{{ .Release.Name }}-token-{{ .Values.auth.superUsers.broker }}" items: - key: TOKEN path: broker/token {{- end}} {{- end}} {{- if .Values.broker.storageOffload.driver }} {{- if eq .Values.broker.storageOffload.driver "google-cloud-storage" }} - name: gcp-service-account secret: secretName: {{ .Values.broker.storageOffload.gcsServiceAccountSecret }} {{- end }} {{- end }} {{- include "pulsar.broker.certs.volumes" . | nindent 6 }} {{- include "pulsar.imagePullSecrets" . | nindent 6}} {{- end }}