charts/pulsar/templates/broker-statefulset.yaml (342 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
{{- if .Values.components.broker }}
apiVersion: apps/v1
kind: StatefulSet
metadata:
{{- $stsName := printf "%s-%s" (include "pulsar.fullname" .) .Values.broker.component }}
name: {{ $stsName | quote }}
{{- $namespace := include "pulsar.namespace" . }}
namespace: {{ $namespace | quote }}
annotations: {{ .Values.broker.appAnnotations | toYaml | nindent 4 }}
labels:
{{- include "pulsar.standardLabels" . | nindent 4 }}
component: {{ .Values.broker.component }}
spec:
serviceName: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}"
{{- if not .Values.broker.autoscaling.enabled }}
replicas: {{ .Values.broker.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "pulsar.matchLabels" . | nindent 6 }}
component: {{ .Values.broker.component }}
updateStrategy:
type: RollingUpdate
{{- /*
When functions are enabled, podManagementPolicy must be OrderedReady to ensure that other started brokers are available via DNS
for the function worker to connect to.
Since podManagementPolicy is immutable, this rule is only applied when the broker is first installed.
*/}}
{{- $stsObj := lookup "apps/v1" "StatefulSet" $namespace $stsName }}
{{- if $stsObj }}
podManagementPolicy: {{ $stsObj.spec.podManagementPolicy }}
{{- else }}
{{- if .Values.broker.podManagementPolicy }}
podManagementPolicy: {{ .Values.broker.podManagementPolicy }}
{{- else if not .Values.components.functions }}
podManagementPolicy: Parallel
{{- else }}
podManagementPolicy: OrderedReady
{{- end }}
{{- end }}
template:
metadata:
labels:
{{- include "pulsar.template.labels" . | nindent 8 }}
component: {{ .Values.broker.component }}
annotations:
{{- if not .Values.broker.podMonitor.enabled }}
prometheus.io/scrape: "true"
prometheus.io/port: "{{ .Values.broker.ports.http }}"
{{- end }}
{{- if .Values.broker.restartPodsOnConfigMapChange }}
checksum/config: {{ include (print $.Template.BasePath "/broker-configmap.yaml") . | sha256sum }}
{{- end }}
{{- with .Values.broker.annotations }}
{{ toYaml . | indent 8 }}
{{- end }}
spec:
serviceAccountName: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}-acct"
{{- if .Values.broker.nodeSelector }}
nodeSelector:
{{ toYaml .Values.broker.nodeSelector | indent 8 }}
{{- end }}
{{- if .Values.broker.tolerations }}
tolerations:
{{ toYaml .Values.broker.tolerations | indent 8 }}
{{- end }}
{{- if .Values.broker.topologySpreadConstraints }}
topologySpreadConstraints:
{{- toYaml .Values.broker.topologySpreadConstraints | nindent 8 }}
{{- end }}
affinity:
{{- if and .Values.affinity.anti_affinity .Values.broker.affinity.anti_affinity}}
podAntiAffinity:
{{- if eq .Values.broker.affinity.type "requiredDuringSchedulingIgnoredDuringExecution"}}
{{ .Values.broker.affinity.type }}:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- "{{ template "pulsar.name" . }}"
- key: "release"
operator: In
values:
- {{ .Release.Name }}
- key: "component"
operator: In
values:
- {{ .Values.broker.component }}
topologyKey: {{ .Values.broker.affinity.anti_affinity_topology_key }}
{{- else }}
{{ .Values.broker.affinity.type }}:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- "{{ template "pulsar.name" . }}"
- key: "release"
operator: In
values:
- {{ .Release.Name }}
- key: "component"
operator: In
values:
- {{ .Values.broker.component }}
topologyKey: {{ .Values.broker.affinity.anti_affinity_topology_key }}
{{- end }}
{{- end }}
terminationGracePeriodSeconds: {{ .Values.broker.gracePeriod }}
initContainers:
{{- if and .Values.components.zookeeper .Values.broker.waitZookeeperTimeout (gt (.Values.broker.waitZookeeperTimeout | int) 0) }}
# This init container will wait for zookeeper to be ready before
# deploying the bookies
- name: wait-zookeeper-ready
image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}"
imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}"
resources: {{ toYaml .Values.initContainer.resources | nindent 10 }}
command: ["timeout", "{{ .Values.broker.waitZookeeperTimeout }}", "sh", "-c"]
args:
- |
{{- include "pulsar.broker.zookeeper.tls.settings" . | nindent 12 }}
export PULSAR_MEM="-Xmx128M";
{{- if .Values.pulsar_metadata.configurationStore }}
until timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.configurationStore.connect" . }} get {{ .Values.configurationStoreMetadataPrefix }}/admin/clusters/{{ template "pulsar.cluster.name" . }}; do
{{- end }}
{{- if not .Values.pulsar_metadata.configurationStore }}
until timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.zookeeper.connect" . }} get {{ .Values.metadataPrefix }}/admin/clusters/{{ template "pulsar.cluster.name" . }}; do
{{- end }}
echo "pulsar cluster {{ template "pulsar.cluster.name" . }} isn't initialized yet ... check in 3 seconds ..." && sleep 3;
done;
volumeMounts:
{{- include "pulsar.broker.certs.volumeMounts" . | nindent 8 }}
{{- end }}
{{- if and .Values.components.oxia .Values.broker.waitOxiaTimeout (gt (.Values.broker.waitOxiaTimeout | int) 0) }}
- name: wait-oxia-ready
image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}"
imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}"
resources: {{ toYaml .Values.initContainer.resources | nindent 10 }}
command: ["timeout", "{{ .Values.broker.waitOxiaTimeout }}", "sh", "-c"]
args:
- |
until nslookup {{ template "pulsar.oxia.server.service" . }}; do
sleep 3;
done;
{{- end }}
{{- if and .Values.broker.waitBookkeeperTimeout (gt (.Values.broker.waitBookkeeperTimeout | int) 0) }}
# This init container will wait for bookkeeper to be ready before
# deploying the broker
- name: wait-bookkeeper-ready
image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}"
imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}"
resources: {{ toYaml .Values.initContainer.resources | nindent 10 }}
command: ["timeout", "{{ .Values.broker.waitBookkeeperTimeout }}", "sh", "-c"]
args:
- |
{{- include "pulsar.broker.zookeeper.tls.settings" . | nindent 12 }}
bin/apply-config-from-env.py conf/bookkeeper.conf;
export BOOKIE_MEM="-Xmx128M";
until timeout 15 bin/bookkeeper shell whatisinstanceid; do
echo "bookkeeper cluster is not initialized yet. backoff for 3 seconds ...";
sleep 3;
done;
echo "bookkeeper cluster is already initialized";
bookieServiceNumber="$(nslookup -timeout=10 {{ template "pulsar.fullname" . }}-{{ .Values.bookkeeper.component }} | grep Name | wc -l)";
until [ ${bookieServiceNumber} -ge {{ .Values.broker.configData.managedLedgerDefaultEnsembleSize }} ]; do
echo "bookkeeper cluster {{ template "pulsar.cluster.name" . }} isn't ready yet ... check in 10 seconds ...";
sleep 10;
bookieServiceNumber="$(nslookup -timeout=10 {{ template "pulsar.fullname" . }}-{{ .Values.bookkeeper.component }} | grep Name | wc -l)";
done;
echo "bookkeeper cluster is ready";
envFrom:
- configMapRef:
name: "{{ template "pulsar.fullname" . }}-{{ .Values.bookkeeper.component }}"
volumeMounts:
{{- include "pulsar.broker.certs.volumeMounts" . | nindent 10 }}
{{- end }}
{{- if .Values.broker.initContainers }}
{{- toYaml .Values.broker.initContainers | nindent 6 }}
{{- end }}
containers:
- name: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}"
image: "{{ template "pulsar.imageFullName" (dict "image" .Values.images.broker "root" .) }}"
imagePullPolicy: "{{ template "pulsar.imagePullPolicy" (dict "image" .Values.images.broker "root" .) }}"
{{- if .Values.broker.probe.liveness.enabled }}
livenessProbe:
httpGet:
path: /status.html
port: {{ .Values.broker.ports.http }}
initialDelaySeconds: {{ .Values.broker.probe.liveness.initialDelaySeconds }}
periodSeconds: {{ .Values.broker.probe.liveness.periodSeconds }}
timeoutSeconds: {{ .Values.broker.probe.liveness.timeoutSeconds }}
failureThreshold: {{ .Values.broker.probe.liveness.failureThreshold }}
{{- end }}
{{- if .Values.broker.probe.readiness.enabled }}
readinessProbe:
httpGet:
path: /status.html
port: {{ .Values.broker.ports.http }}
initialDelaySeconds: {{ .Values.broker.probe.readiness.initialDelaySeconds }}
periodSeconds: {{ .Values.broker.probe.readiness.periodSeconds }}
timeoutSeconds: {{ .Values.broker.probe.readiness.timeoutSeconds }}
failureThreshold: {{ .Values.broker.probe.readiness.failureThreshold }}
{{- end }}
{{- if .Values.broker.probe.startup.enabled }}
startupProbe:
httpGet:
path: /status.html
port: {{ .Values.broker.ports.http }}
initialDelaySeconds: {{ .Values.broker.probe.startup.initialDelaySeconds }}
periodSeconds: {{ .Values.broker.probe.startup.periodSeconds }}
timeoutSeconds: {{ .Values.broker.probe.startup.timeoutSeconds }}
failureThreshold: {{ .Values.broker.probe.startup.failureThreshold }}
{{- end }}
{{- if .Values.broker.resources }}
resources:
{{ toYaml .Values.broker.resources | indent 10 }}
{{- end }}
command: ["sh", "-c"]
args:
- |
{{- if .Values.broker.additionalCommand }}
{{ .Values.broker.additionalCommand }}
{{- end }}
bin/apply-config-from-env.py conf/broker.conf;
bin/gen-yml-from-env.py conf/functions_worker.yml;
echo "OK" > "${statusFilePath:-status}";
{{- if .Values.components.zookeeper }}
{{- include "pulsar.broker.zookeeper.tls.settings" . | nindent 10 }}
timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.zookeeper.connect" . }} get {{ template "pulsar.broker.znode" . }};
while [ $? -eq 0 ]; do
echo "broker {{ template "pulsar.broker.hostname" . }} znode still exists ... check in 10 seconds ...";
sleep 10;
timeout 15 bin/pulsar zookeeper-shell -server {{ template "pulsar.zookeeper.connect" . }} get {{ template "pulsar.broker.znode" . }};
done;
{{- end }}
cat conf/pulsar_env.sh;
OPTS="${OPTS} -Dlog4j2.formatMsgNoLookups=true" exec bin/pulsar broker;
ports:
# prometheus needs to access /metrics endpoint
- name: http
containerPort: {{ .Values.broker.ports.http }}
{{- if or (not .Values.tls.enabled) (not .Values.tls.broker.enabled) }}
- name: "{{ .Values.tcpPrefix }}pulsar"
containerPort: {{ .Values.broker.ports.pulsar }}
{{- end }}
{{- if and .Values.tls.enabled .Values.tls.broker.enabled }}
- name: https
containerPort: {{ .Values.broker.ports.https }}
- name: "{{ .Values.tlsPrefix }}pulsarssl"
containerPort: {{ .Values.broker.ports.pulsarssl }}
{{- end }}
envFrom:
- configMapRef:
name: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}"
volumeMounts:
{{- if .Values.auth.authentication.enabled }}
{{- if eq .Values.auth.authentication.provider "jwt" }}
- mountPath: "/pulsar/keys"
name: token-keys
readOnly: true
- mountPath: "/pulsar/tokens"
name: broker-token
readOnly: true
{{- end }}
{{- end }}
{{- if .Values.broker.storageOffload.driver }}
{{- if eq .Values.broker.storageOffload.driver "google-cloud-storage" }}
- name: gcp-service-account
readOnly: true
mountPath: /pulsar/gcp-service-account
{{- end }}
{{- end }}
{{- if .Values.broker.extraVolumeMounts }}
{{ toYaml .Values.broker.extraVolumeMounts | indent 10 }}
{{- end }}
{{- include "pulsar.broker.certs.volumeMounts" . | nindent 10 }}
env:
{{- if and (and .Values.broker.storageOffload (eq .Values.broker.storageOffload.driver "aws-s3")) .Values.broker.storageOffload.secret }}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Values.broker.storageOffload.secret }}
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Values.broker.storageOffload.secret }}
key: AWS_SECRET_ACCESS_KEY
{{- end }}
{{- if and .Values.broker.storageOffload (eq .Values.broker.storageOffload.driver "azureblob") }}
- name: AZURE_STORAGE_ACCOUNT
valueFrom:
secretKeyRef:
name: {{ .Values.broker.storageOffload.secret }}
key: AZURE_STORAGE_ACCOUNT
- name: AZURE_STORAGE_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Values.broker.storageOffload.secret }}
key: AZURE_STORAGE_ACCESS_KEY
{{- end }}
{{- if .Values.broker.extraEnvs }}
{{- toYaml .Values.broker.extraEnvs | nindent 10 }}
{{- end }}
volumes:
{{- if .Values.broker.extraVolumes }}
{{ toYaml .Values.broker.extraVolumes | indent 6 }}
{{- end }}
{{- if .Values.auth.authentication.enabled }}
{{- if eq .Values.auth.authentication.provider "jwt" }}
- name: token-keys
secret:
{{- if not .Values.auth.authentication.jwt.usingSecretKey }}
secretName: "{{ .Release.Name }}-token-asymmetric-key"
{{- end}}
{{- if .Values.auth.authentication.jwt.usingSecretKey }}
secretName: "{{ .Release.Name }}-token-symmetric-key"
{{- end}}
items:
{{- if .Values.auth.authentication.jwt.usingSecretKey }}
- key: SECRETKEY
path: token/secret.key
{{- else }}
- key: PUBLICKEY
path: token/public.key
{{- end}}
- name: broker-token
secret:
secretName: "{{ .Release.Name }}-token-{{ .Values.auth.superUsers.broker }}"
items:
- key: TOKEN
path: broker/token
{{- end}}
{{- end}}
{{- if .Values.broker.storageOffload.driver }}
{{- if eq .Values.broker.storageOffload.driver "google-cloud-storage" }}
- name: gcp-service-account
secret:
secretName: {{ .Values.broker.storageOffload.gcsServiceAccountSecret }}
{{- end }}
{{- end }}
{{- include "pulsar.broker.certs.volumes" . | nindent 6 }}
{{- include "pulsar.imagePullSecrets" . | nindent 6}}
{{- end }}