attached-logging-monitoring/logging/aggregator.yaml (317 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # [START anthos_logging_aggregator_serviceaccount_stackdriver_log_aggregator] # Service account for Log Aggregator (Fluentd). apiVersion: v1 kind: ServiceAccount metadata: name: stackdriver-log-aggregator namespace: kube-system # [END anthos_logging_aggregator_serviceaccount_stackdriver_log_aggregator] --- # [START anthos_logging_aggregator_service_stackdriver_log_aggregator_in_forward] # Service to expose in_forward plugin port on the Log Aggregator (Fluentd) side. kind: Service apiVersion: v1 metadata: name: stackdriver-log-aggregator-in-forward namespace: kube-system labels: app: stackdriver-log-aggregator-in-forward spec: ports: - name: in-forward port: 8989 protocol: TCP targetPort: 8989 # Avoid any session affinity to make sure traffic can be redirected to newly # scaled up Log Aggregator (Fluentd). sessionAffinity: None selector: app: stackdriver-log-aggregator # [END anthos_logging_aggregator_service_stackdriver_log_aggregator_in_forward] --- # [START anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_in_forward] # Network policy to only allow Log Forwarder (Fluent Bit) talking to Log Aggregator (Fluentd). apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: stackdriver-log-aggregator-in-forward namespace: kube-system spec: podSelector: matchLabels: app: stackdriver-log-aggregator policyTypes: - Ingress ingress: - from: - namespaceSelector: matchLabels: name: kube-system - podSelector: matchLabels: app: stackdriver-log-forwarder ports: - protocol: TCP port: 8989 # [END anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_in_forward] --- # [START anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_prometheus_scrape] # Network policy to allow traffic from Prometheus Scraper into Log Aggregator (Fluentd). apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: stackdriver-log-aggregator-prometheus-scrape namespace: kube-system spec: podSelector: matchLabels: app: stackdriver-log-aggregator policyTypes: - Ingress ingress: - from: - namespaceSelector: matchLabels: name: kube-system - podSelector: matchLabels: app: stackdriver-prometheus-k8s ports: - protocol: TCP port: 24231 - protocol: TCP port: 24232 - protocol: TCP port: 24233 - protocol: TCP port: 24234 - protocol: TCP port: 24235 - protocol: TCP port: 24236 - protocol: TCP port: 24237 - protocol: TCP port: 24238 - protocol: TCP port: 24239 - protocol: TCP port: 24240 # [END anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_prometheus_scrape] --- # [START anthos_logging_aggregator_statefulset_stackdriver_log_aggregator] # Log Aggregator (Fluentd) Deployment to send log entries to Stackdriver. apiVersion: apps/v1 kind: StatefulSet metadata: name: stackdriver-log-aggregator namespace: kube-system labels: app: stackdriver-log-aggregator spec: replicas: 2 selector: matchLabels: app: stackdriver-log-aggregator managed-by: stackdriver serviceName: stackdriver-log-aggregator-in-forward # Persistent volume for Stackdriver Aggregator (Fluentd) logs and buffer # files. volumeClaimTemplates: - metadata: name: stackdriver-log-aggregator-persistent-volume-claim namespace: kube-system labels: app: stackdriver-log-aggregator spec: # storageClassName: standard #GCP # storageClassName: gp2 #AWS EKS # storageClassName: default #Azure AKS accessModes: - ReadWriteOnce resources: requests: storage: 100Gi template: metadata: labels: app: stackdriver-log-aggregator managed-by: stackdriver spec: serviceAccountName: stackdriver-log-aggregator nodeSelector: kubernetes.io/os: linux containers: - name: stackdriver-log-aggregator image: gcr.io/stackdriver-agents/stackdriver-logging-agent:1.8.4 imagePullPolicy: IfNotPresent env: - name: GOOGLE_APPLICATION_CREDENTIALS value: /google-cloud-credentials/credentials.json resources: # TODO: Make these values customizable by users. limits: memory: 2000Mi requests: cpu: 500m memory: 1000Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File ports: - name: in-forward containerPort: 8989 - name: worker0 containerPort: 24231 - name: worker1 containerPort: 24232 - name: worker2 containerPort: 24233 - name: worker3 containerPort: 24234 - name: worker4 containerPort: 24235 - name: worker5 containerPort: 24236 - name: worker6 containerPort: 24237 - name: worker7 containerPort: 24238 - name: worker8 containerPort: 24239 - name: worker9 containerPort: 24240 # Liveness probe is aimed to help in situarions where fluentd # silently hangs for no apparent reasons until manual restart. # The idea of this probe is that if fluentd is not queueing or # flushing chunks for 5 minutes, something is not right. If # you want to change the fluentd configuration, reducing amount of # logs fluentd collects, consider changing the threshold or turning # liveness probe off completely. livenessProbe: initialDelaySeconds: 600 periodSeconds: 60 exec: command: - '/bin/sh' - '-c' - > LIVENESS_THRESHOLD_SECONDS=${LIVENESS_THRESHOLD_SECONDS:-300}; STUCK_THRESHOLD_SECONDS=${STUCK_THRESHOLD_SECONDS:-900}; if [ ! -e /stackdriver-log-aggregator-persistent-volume ]; then exit 1; fi; mkdir -p /tmp; touch -d "${STUCK_THRESHOLD_SECONDS} seconds ago" /tmp/marker-stuck; if [ -z "$(find /stackdriver-log-aggregator-persistent-volume -type d -newer /tmp/marker-stuck -print -quit)" ]; then rm -rf /stackdriver-log-aggregator-persistent-volume/*; exit 1; fi; touch -d "${LIVENESS_THRESHOLD_SECONDS} seconds ago" /tmp/marker-liveness; if [ -z "$(find /stackdriver-log-aggregator-persistent-volume -type d -newer /tmp/marker-liveness -print -quit)" ]; then exit 1; fi; volumeMounts: - mountPath: /var/log name: varlog readOnly: true # Overwrite the default Fluentd config at: # /etc/google-fluentd/google-fluentd.conf. - mountPath: /etc/google-fluentd/google-fluentd.conf subPath: google-fluentd.conf name: output-config-volume - mountPath: /etc/google-fluentd/config.d name: input-config-volume - mountPath: /google-cloud-credentials name: google-cloud-credentials - mountPath: /stackdriver-log-aggregator-persistent-volume name: stackdriver-log-aggregator-persistent-volume-claim restartPolicy: Always schedulerName: default-scheduler securityContext: {} tolerations: - key: "CriticalAddonsOnly" operator: "Exists" - key: node-role.gke.io/observability effect: NoSchedule volumes: - hostPath: path: /var/log type: "" name: varlog - configMap: defaultMode: 420 name: stackdriver-log-aggregator-output-config name: output-config-volume - configMap: defaultMode: 420 name: stackdriver-log-aggregator-input-config name: input-config-volume - secret: defaultMode: 420 secretName: google-cloud-credentials name: google-cloud-credentials # [END anthos_logging_aggregator_statefulset_stackdriver_log_aggregator] --- # [START anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_input_config] # Config map for Log Aggregator (Fluentd) input and corresponding filter # plugins. apiVersion: v1 kind: ConfigMap metadata: name: stackdriver-log-aggregator-input-config namespace: kube-system labels: app: stackdriver-log-aggregator data: forward.input.conf: |- <source> @type forward port 8989 bind 0.0.0.0 </source> # TODO: Explore whether some of this logic could be moved to Log Forwarder # side. # https://docs.fluentbit.io/manual/filter/lua might be the best bet. All # other filter plugins do not work as of Mar 2019 (already tried). Lua did # not seem to work well with the Fluent-bit "kubernetes" filter (failed to # parse the logs after they had been processed by "kubernetes" filter. But # we did not try very hard either. Worth investigating at some point. <match k8s_container.**> @type record_modifier <record> # Extract local_resource_id from tag for 'k8s_container' monitored # resource. The format is: # 'k8s_container.<namespace_name>.<pod_name>.<container_name>'. "logging.googleapis.com/local_resource_id" ${tag} # - Rename 'log' field to the more generic 'message'. This lets the # fluent-plugin-google-cloud know to flatten the field as 'textPayload' # instead of 'jsonPayload' after extracting 'time', 'severity' and # 'stream' from the record. # - Trim the entries which exceed 100KB to avoid dropping them, since # Stackdriver only supports entries that are up to 100KB in size. message ${record['log'].length > 100000 ? "[Trimmed]#{record['log'][0..100000]}..." : record['log']} # If 'severity' is not set, set severity to DEFAULT (i.e. unknown). severity ${record['severity'] || 'DEFAULT'} # Extract "kubernetes"->"labels" and set them as # "logging.googleapis.com/labels". Prefix these labels with "k8s-pod/" # to distinguish with other labels and avoid label name collision with # other types of labels. _dummy_labels_ ${if record.is_a?(Hash) && record.has_key?('kubernetes') && record['kubernetes'].has_key?('labels') && record['kubernetes']['labels'].is_a?(Hash); then; if record["logging.googleapis.com/labels"].nil?; then; record["logging.googleapis.com/labels"] = Hash.new; end; record["logging.googleapis.com/labels"].merge!(record['kubernetes']['labels'].map{ |k, v| ["k8s-pod/#{k}", v]}.to_h); end; nil} # TODO: Parse 'source' from glog lines either here or in the forwarder. # Parse the 'source' field created for glog lines into a single # top-level field, for proper processing by the output plugin. # For example, if a record includes: # {"source":"handlers.go:131"}, # then the following entry will be added to the record: # {"logging.googleapis.com/sourceLocation": # {"file":"handlers.go", "line":"131"} # } _dummy_source_location_ ${if record.is_a?(Hash) && record.has_key?('source') && record['source'].include?(':'); then; source_parts = record['source'].split(':', 2); record['logging.googleapis.com/sourceLocation'] = {'file' => source_parts[0], 'line' => source_parts[1]} else; nil; end} </record> tag ${if record['stream'] == 'stderr' then 'stderr' else 'stdout' end} remove_keys kubernetes,log,stream,_dummy_labels_,_dummy_source_location_ </match> # [END anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_input_config] --- # [START anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_output_config] # Config map for Log Aggregator (Fluentd) multi worker settings and output # plugins. apiVersion: v1 kind: ConfigMap metadata: name: stackdriver-log-aggregator-output-config namespace: kube-system labels: app: stackdriver-log-aggregator data: google-fluentd.conf: |- @include config.d/*.conf <system> workers 10 root_dir /stackdriver-log-aggregator-persistent-volume </system> # Each worker binds to `port` + fluent_worker_id. <source> @type prometheus port 24231 <labels> worker_id ${worker_id} </labels> </source> <source> @type prometheus_monitor <labels> worker_id ${worker_id} </labels> </source> <source> @type prometheus_output_monitor <labels> worker_id ${worker_id} </labels> </source> # Do not collect fluentd's own logs to avoid infinite loops. <match fluent.**> @type null </match> <match **> @type google_cloud @id google_cloud # Try to detect JSON formatted log entries. detect_json true # Collect metrics in Prometheus registry about plugin activity. enable_monitoring true monitoring_type prometheus # Allow log entries from multiple containers to be sent in the same # request. split_logs_by_tag false <buffer> # Set the buffer type to file to improve the reliability and reduce the # memory consumption. @type file # The max size of each chunks: events will be written into chunks until # the size of chunks become this size # Set the chunk limit conservatively to avoid exceeding the recommended # chunk size of 5MB per write request. chunk_limit_size 512k # Block processing of input plugin to emit events into that buffer. overflow_action block # The size limitation of this buffer plugin instance. # In total 10 * 10 = 100GB. total_limit_size 10GB # Never wait more than 5 seconds before flushing logs in the non-error # case. flush_interval 5s # Use multiple threads for flushing chunks. flush_thread_count 10 # How output plugin behaves when its buffer queue is full overflow_action drop_oldest_chunk # This has to be false in order to let retry_timeout and retry_max_times # options take effect. retry_forever false # Seconds to wait before next retry to flush. retry_wait 10s # The base number of exponential backoff for retries. retry_exponential_backoff_base 3 # The maximum interval seconds for exponential backoff between retries # while failing. retry_max_interval 30m # The maximum seconds to retry to flush while failing, until plugin # discards buffer chunks. retry_timeout 24h # Wait seconds will become large exponentially per failures. retry_type exponential_backoff </buffer> use_grpc true project_id [PROJECT_ID] k8s_cluster_name [CLUSTER_NAME] k8s_cluster_location [CLUSTER_LOCATION] adjust_invalid_timestamps false # Metadata Server is not available in On-Prem world. Skip the check to # avoid misleading errors in the log. use_metadata_service false </match> # [END anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_output_config]