attached-logging-monitoring/logging/aggregator.yaml (317 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START anthos_logging_aggregator_serviceaccount_stackdriver_log_aggregator]
# Service account for Log Aggregator (Fluentd).
apiVersion: v1
kind: ServiceAccount
metadata:
name: stackdriver-log-aggregator
namespace: kube-system
# [END anthos_logging_aggregator_serviceaccount_stackdriver_log_aggregator]
---
# [START anthos_logging_aggregator_service_stackdriver_log_aggregator_in_forward]
# Service to expose in_forward plugin port on the Log Aggregator (Fluentd) side.
kind: Service
apiVersion: v1
metadata:
name: stackdriver-log-aggregator-in-forward
namespace: kube-system
labels:
app: stackdriver-log-aggregator-in-forward
spec:
ports:
- name: in-forward
port: 8989
protocol: TCP
targetPort: 8989
# Avoid any session affinity to make sure traffic can be redirected to newly
# scaled up Log Aggregator (Fluentd).
sessionAffinity: None
selector:
app: stackdriver-log-aggregator
# [END anthos_logging_aggregator_service_stackdriver_log_aggregator_in_forward]
---
# [START anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_in_forward]
# Network policy to only allow Log Forwarder (Fluent Bit) talking to Log Aggregator (Fluentd).
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: stackdriver-log-aggregator-in-forward
namespace: kube-system
spec:
podSelector:
matchLabels:
app: stackdriver-log-aggregator
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: kube-system
- podSelector:
matchLabels:
app: stackdriver-log-forwarder
ports:
- protocol: TCP
port: 8989
# [END anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_in_forward]
---
# [START anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_prometheus_scrape]
# Network policy to allow traffic from Prometheus Scraper into Log Aggregator (Fluentd).
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: stackdriver-log-aggregator-prometheus-scrape
namespace: kube-system
spec:
podSelector:
matchLabels:
app: stackdriver-log-aggregator
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: kube-system
- podSelector:
matchLabels:
app: stackdriver-prometheus-k8s
ports:
- protocol: TCP
port: 24231
- protocol: TCP
port: 24232
- protocol: TCP
port: 24233
- protocol: TCP
port: 24234
- protocol: TCP
port: 24235
- protocol: TCP
port: 24236
- protocol: TCP
port: 24237
- protocol: TCP
port: 24238
- protocol: TCP
port: 24239
- protocol: TCP
port: 24240
# [END anthos_logging_aggregator_networkpolicy_stackdriver_log_aggregator_prometheus_scrape]
---
# [START anthos_logging_aggregator_statefulset_stackdriver_log_aggregator]
# Log Aggregator (Fluentd) Deployment to send log entries to Stackdriver.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: stackdriver-log-aggregator
namespace: kube-system
labels:
app: stackdriver-log-aggregator
spec:
replicas: 2
selector:
matchLabels:
app: stackdriver-log-aggregator
managed-by: stackdriver
serviceName: stackdriver-log-aggregator-in-forward
# Persistent volume for Stackdriver Aggregator (Fluentd) logs and buffer
# files.
volumeClaimTemplates:
- metadata:
name: stackdriver-log-aggregator-persistent-volume-claim
namespace: kube-system
labels:
app: stackdriver-log-aggregator
spec:
# storageClassName: standard #GCP
# storageClassName: gp2 #AWS EKS
# storageClassName: default #Azure AKS
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
template:
metadata:
labels:
app: stackdriver-log-aggregator
managed-by: stackdriver
spec:
serviceAccountName: stackdriver-log-aggregator
nodeSelector:
kubernetes.io/os: linux
containers:
- name: stackdriver-log-aggregator
image: gcr.io/stackdriver-agents/stackdriver-logging-agent:1.8.4
imagePullPolicy: IfNotPresent
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /google-cloud-credentials/credentials.json
resources:
# TODO: Make these values customizable by users.
limits:
memory: 2000Mi
requests:
cpu: 500m
memory: 1000Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
ports:
- name: in-forward
containerPort: 8989
- name: worker0
containerPort: 24231
- name: worker1
containerPort: 24232
- name: worker2
containerPort: 24233
- name: worker3
containerPort: 24234
- name: worker4
containerPort: 24235
- name: worker5
containerPort: 24236
- name: worker6
containerPort: 24237
- name: worker7
containerPort: 24238
- name: worker8
containerPort: 24239
- name: worker9
containerPort: 24240
# Liveness probe is aimed to help in situarions where fluentd
# silently hangs for no apparent reasons until manual restart.
# The idea of this probe is that if fluentd is not queueing or
# flushing chunks for 5 minutes, something is not right. If
# you want to change the fluentd configuration, reducing amount of
# logs fluentd collects, consider changing the threshold or turning
# liveness probe off completely.
livenessProbe:
initialDelaySeconds: 600
periodSeconds: 60
exec:
command:
- '/bin/sh'
- '-c'
- >
LIVENESS_THRESHOLD_SECONDS=${LIVENESS_THRESHOLD_SECONDS:-300};
STUCK_THRESHOLD_SECONDS=${STUCK_THRESHOLD_SECONDS:-900};
if [ ! -e /stackdriver-log-aggregator-persistent-volume ];
then
exit 1;
fi;
mkdir -p /tmp;
touch -d "${STUCK_THRESHOLD_SECONDS} seconds ago" /tmp/marker-stuck;
if [ -z "$(find /stackdriver-log-aggregator-persistent-volume -type d -newer /tmp/marker-stuck -print -quit)" ];
then
rm -rf /stackdriver-log-aggregator-persistent-volume/*;
exit 1;
fi;
touch -d "${LIVENESS_THRESHOLD_SECONDS} seconds ago" /tmp/marker-liveness;
if [ -z "$(find /stackdriver-log-aggregator-persistent-volume -type d -newer /tmp/marker-liveness -print -quit)" ];
then
exit 1;
fi;
volumeMounts:
- mountPath: /var/log
name: varlog
readOnly: true
# Overwrite the default Fluentd config at:
# /etc/google-fluentd/google-fluentd.conf.
- mountPath: /etc/google-fluentd/google-fluentd.conf
subPath: google-fluentd.conf
name: output-config-volume
- mountPath: /etc/google-fluentd/config.d
name: input-config-volume
- mountPath: /google-cloud-credentials
name: google-cloud-credentials
- mountPath: /stackdriver-log-aggregator-persistent-volume
name: stackdriver-log-aggregator-persistent-volume-claim
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
tolerations:
- key: "CriticalAddonsOnly"
operator: "Exists"
- key: node-role.gke.io/observability
effect: NoSchedule
volumes:
- hostPath:
path: /var/log
type: ""
name: varlog
- configMap:
defaultMode: 420
name: stackdriver-log-aggregator-output-config
name: output-config-volume
- configMap:
defaultMode: 420
name: stackdriver-log-aggregator-input-config
name: input-config-volume
- secret:
defaultMode: 420
secretName: google-cloud-credentials
name: google-cloud-credentials
# [END anthos_logging_aggregator_statefulset_stackdriver_log_aggregator]
---
# [START anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_input_config]
# Config map for Log Aggregator (Fluentd) input and corresponding filter
# plugins.
apiVersion: v1
kind: ConfigMap
metadata:
name: stackdriver-log-aggregator-input-config
namespace: kube-system
labels:
app: stackdriver-log-aggregator
data:
forward.input.conf: |-
<source>
@type forward
port 8989
bind 0.0.0.0
</source>
# TODO: Explore whether some of this logic could be moved to Log Forwarder
# side.
# https://docs.fluentbit.io/manual/filter/lua might be the best bet. All
# other filter plugins do not work as of Mar 2019 (already tried). Lua did
# not seem to work well with the Fluent-bit "kubernetes" filter (failed to
# parse the logs after they had been processed by "kubernetes" filter. But
# we did not try very hard either. Worth investigating at some point.
<match k8s_container.**>
@type record_modifier
<record>
# Extract local_resource_id from tag for 'k8s_container' monitored
# resource. The format is:
# 'k8s_container.<namespace_name>.<pod_name>.<container_name>'.
"logging.googleapis.com/local_resource_id" ${tag}
# - Rename 'log' field to the more generic 'message'. This lets the
# fluent-plugin-google-cloud know to flatten the field as 'textPayload'
# instead of 'jsonPayload' after extracting 'time', 'severity' and
# 'stream' from the record.
# - Trim the entries which exceed 100KB to avoid dropping them, since
# Stackdriver only supports entries that are up to 100KB in size.
message ${record['log'].length > 100000 ? "[Trimmed]#{record['log'][0..100000]}..." : record['log']}
# If 'severity' is not set, set severity to DEFAULT (i.e. unknown).
severity ${record['severity'] || 'DEFAULT'}
# Extract "kubernetes"->"labels" and set them as
# "logging.googleapis.com/labels". Prefix these labels with "k8s-pod/"
# to distinguish with other labels and avoid label name collision with
# other types of labels.
_dummy_labels_ ${if record.is_a?(Hash) && record.has_key?('kubernetes') && record['kubernetes'].has_key?('labels') && record['kubernetes']['labels'].is_a?(Hash); then; if record["logging.googleapis.com/labels"].nil?; then; record["logging.googleapis.com/labels"] = Hash.new; end; record["logging.googleapis.com/labels"].merge!(record['kubernetes']['labels'].map{ |k, v| ["k8s-pod/#{k}", v]}.to_h); end; nil}
# TODO: Parse 'source' from glog lines either here or in the forwarder.
# Parse the 'source' field created for glog lines into a single
# top-level field, for proper processing by the output plugin.
# For example, if a record includes:
# {"source":"handlers.go:131"},
# then the following entry will be added to the record:
# {"logging.googleapis.com/sourceLocation":
# {"file":"handlers.go", "line":"131"}
# }
_dummy_source_location_ ${if record.is_a?(Hash) && record.has_key?('source') && record['source'].include?(':'); then; source_parts = record['source'].split(':', 2); record['logging.googleapis.com/sourceLocation'] = {'file' => source_parts[0], 'line' => source_parts[1]} else; nil; end}
</record>
tag ${if record['stream'] == 'stderr' then 'stderr' else 'stdout' end}
remove_keys kubernetes,log,stream,_dummy_labels_,_dummy_source_location_
</match>
# [END anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_input_config]
---
# [START anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_output_config]
# Config map for Log Aggregator (Fluentd) multi worker settings and output
# plugins.
apiVersion: v1
kind: ConfigMap
metadata:
name: stackdriver-log-aggregator-output-config
namespace: kube-system
labels:
app: stackdriver-log-aggregator
data:
google-fluentd.conf: |-
@include config.d/*.conf
<system>
workers 10
root_dir /stackdriver-log-aggregator-persistent-volume
</system>
# Each worker binds to `port` + fluent_worker_id.
<source>
@type prometheus
port 24231
<labels>
worker_id ${worker_id}
</labels>
</source>
<source>
@type prometheus_monitor
<labels>
worker_id ${worker_id}
</labels>
</source>
<source>
@type prometheus_output_monitor
<labels>
worker_id ${worker_id}
</labels>
</source>
# Do not collect fluentd's own logs to avoid infinite loops.
<match fluent.**>
@type null
</match>
<match **>
@type google_cloud
@id google_cloud
# Try to detect JSON formatted log entries.
detect_json true
# Collect metrics in Prometheus registry about plugin activity.
enable_monitoring true
monitoring_type prometheus
# Allow log entries from multiple containers to be sent in the same
# request.
split_logs_by_tag false
<buffer>
# Set the buffer type to file to improve the reliability and reduce the
# memory consumption.
@type file
# The max size of each chunks: events will be written into chunks until
# the size of chunks become this size
# Set the chunk limit conservatively to avoid exceeding the recommended
# chunk size of 5MB per write request.
chunk_limit_size 512k
# Block processing of input plugin to emit events into that buffer.
overflow_action block
# The size limitation of this buffer plugin instance.
# In total 10 * 10 = 100GB.
total_limit_size 10GB
# Never wait more than 5 seconds before flushing logs in the non-error
# case.
flush_interval 5s
# Use multiple threads for flushing chunks.
flush_thread_count 10
# How output plugin behaves when its buffer queue is full
overflow_action drop_oldest_chunk
# This has to be false in order to let retry_timeout and retry_max_times
# options take effect.
retry_forever false
# Seconds to wait before next retry to flush.
retry_wait 10s
# The base number of exponential backoff for retries.
retry_exponential_backoff_base 3
# The maximum interval seconds for exponential backoff between retries
# while failing.
retry_max_interval 30m
# The maximum seconds to retry to flush while failing, until plugin
# discards buffer chunks.
retry_timeout 24h
# Wait seconds will become large exponentially per failures.
retry_type exponential_backoff
</buffer>
use_grpc true
project_id [PROJECT_ID]
k8s_cluster_name [CLUSTER_NAME]
k8s_cluster_location [CLUSTER_LOCATION]
adjust_invalid_timestamps false
# Metadata Server is not available in On-Prem world. Skip the check to
# avoid misleading errors in the log.
use_metadata_service false
</match>
# [END anthosconfig_logging_aggregator_configmap_stackdriver_log_aggregator_output_config]