pkg/source/gcp/task/gke/k8s_node/parser.go (414 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8s_node
import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/GoogleCloudPlatform/khi/pkg/inspection/logger"
"github.com/GoogleCloudPlatform/khi/pkg/log"
"github.com/GoogleCloudPlatform/khi/pkg/model/enum"
"github.com/GoogleCloudPlatform/khi/pkg/model/history"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourceinfo/noderesource"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath"
"github.com/GoogleCloudPlatform/khi/pkg/parser"
"github.com/GoogleCloudPlatform/khi/pkg/parser/k8s"
"github.com/GoogleCloudPlatform/khi/pkg/source/gcp/inspectiontype"
k8s_node_taskid "github.com/GoogleCloudPlatform/khi/pkg/source/gcp/task/gke/k8s_node/taskid"
"github.com/GoogleCloudPlatform/khi/pkg/task/taskid"
)
var GKENodeLogParseJob = parser.NewParserTaskFromParser(k8s_node_taskid.GKENodeLogParserTaskID, &k8sNodeParser{}, false, inspectiontype.GCPK8sClusterInspectionTypes)
const ContainerdStartingMsg = "starting containerd"
const DockerdStartingMsg = "Starting up"
const DockerdTerminatingMsg = "Daemon shutdown complete"
const ConfigureShStartingMsg = "Start to install kubernetes files"
const ConfigureShTerminatingMsg = "Done for installing kubernetes files"
const ConfigureHelperShStartingMsg = "Start to configure instance for kubernetes"
const ConfigureHelperShTerminatingMsg = "Done for the configuration for kubernetes"
type k8sNodeParser struct {
}
// TargetLogType implements parser.Parser.
func (p *k8sNodeParser) TargetLogType() enum.LogType {
return enum.LogTypeNode
}
// Description implements parser.Parser.
func (*k8sNodeParser) Description() string {
return `Gather node components(e.g docker/container) logs. Log volume can be huge when the cluster has many nodes.`
}
// GetParserName implements parser.Parser.
func (*k8sNodeParser) GetParserName() string {
return `Kubernetes Node Logs`
}
func (*k8sNodeParser) Dependencies() []taskid.UntypedTaskReference {
return []taskid.UntypedTaskReference{}
}
func (*k8sNodeParser) LogTask() taskid.TaskReference[[]*log.LogEntity] {
return k8s_node_taskid.GKENodeLogQueryTaskID.Ref()
}
func (*k8sNodeParser) Grouper() grouper.LogGrouper {
return grouper.NewSingleStringFieldKeyLogGrouper("resource.labels.node_name")
}
func (*k8sNodeParser) GetSyslogIdentifier(l *log.LogEntity) string {
syslogIdentiefier := l.GetStringOrDefault("jsonPayload.SYSLOG_IDENTIFIER", "Unknown")
if strings.HasPrefix(syslogIdentiefier, "(") && strings.HasSuffix(syslogIdentiefier, ")") { // dockerd can be "(dockerd)" in SYSLOG_IDENTIFIER field.
syslogIdentiefier = strings.TrimPrefix(strings.TrimSuffix(syslogIdentiefier, ")"), "(")
}
return syslogIdentiefier
}
// Parse implements parser.Parser.
func (p *k8sNodeParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
if !l.HasKLogField("") {
mainMessage, err := l.MainMessage()
if err != nil {
return err
}
cs.RecordLogSummary(mainMessage)
return nil
}
nodeName := l.GetStringOrDefault("resource.labels.node_name", "")
if nodeName == "" {
return fmt.Errorf("parser couldn't lookup the node name")
}
summary, err := parseDefaultSummary(l)
if err != nil {
return err
}
cs.RecordLogSummary(summary)
severity := enum.SeverityUnknown
mainMessage, err := l.MainMessage()
if err == nil {
severity = k8s.ExractKLogSeverity(mainMessage)
}
cs.RecordLogSeverity(severity)
supportsLifetimeParse := false
syslogIdentifier := p.GetSyslogIdentifier(l)
nodeComponentPath := resourcepath.NodeComponent(nodeName, syslogIdentifier)
if syslogIdentifier == "Unknown" {
// Check if the log is for kube-proxy. If it was true, the log event will be generated on the Pod resource.
logName := l.GetStringOrDefault("logName", "")
if strings.HasSuffix(logName, "kube-proxy") {
kubeProxyPodPath := resourcepath.Pod("kube-system", fmt.Sprintf("kube-proxy-%s", nodeName))
cs.RecordEvent(kubeProxyPodPath)
return nil
}
}
if syslogIdentifier == "containerd" {
msg, err := l.KLogField("msg")
if err != nil {
return err
}
err = p.handleContainerdSandboxLogs(ctx, l, nodeName, msg, builder, cs, summary)
if err != nil {
return err
}
if msg == ContainerdStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "dockerd" {
msg, err := l.KLogField("msg")
if err != nil {
return err
}
if msg == DockerdStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
if msg == DockerdTerminatingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbDelete,
State: enum.RevisionStateDeleted,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "configure.sh" {
msg, err := l.KLogField("")
if err != nil {
return err
}
if msg == ConfigureShStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
if msg == ConfigureShTerminatingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbDelete,
State: enum.RevisionStateDeleted,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "configure-helper.sh" {
msg, err := l.KLogField("")
if err != nil {
return err
}
if msg == ConfigureHelperShStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
if msg == ConfigureHelperShTerminatingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbDelete,
State: enum.RevisionStateDeleted,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "kubelet" {
klogExitCode, err := l.KLogField("exitCode")
if err == nil && klogExitCode != "" && klogExitCode != "0" {
if klogExitCode == "137" {
cs.RecordLogSeverity(enum.SeverityError)
} else {
cs.RecordLogSeverity(enum.SeverityWarning)
}
}
}
// Add inferred revision at the beginning when parse logics written before is not supporting lifetime visualization
if !supportsLifetimeParse {
tb := builder.GetTimelineBuilder(nodeComponentPath.Path)
if tb.GetLatestRevision() == nil {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateInferred,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
}
cs.RecordEvent(nodeComponentPath)
klognode, err := l.KLogField("node")
if err == nil && klognode != "" {
cs.RecordEvent(resourcepath.Node(klognode))
}
resourceBindings := builder.ClusterResource.NodeResourceLogBinder.GetBoundResourcesForLogBody(nodeName, mainMessage)
for _, rb := range resourceBindings {
cs.RecordEvent(rb.GetResourcePath())
summary = rb.RewriteLogSummary(summary)
}
if len(resourceBindings) > 0 {
cs.RecordLogSummary(summary)
} else {
// When this log can't be associated with resource by container id or pod sandbox id, try to get it from klog fields.
podNameWithNamespace, err := l.KLogField("pod")
if err == nil && podNameWithNamespace != "" {
podNameSplitted := strings.Split(podNameWithNamespace, "/")
podNamespace := "unknown"
podName := "unknown"
if len(podNameSplitted) >= 2 {
podNamespace = podNameSplitted[0]
podName = podNameSplitted[1]
}
containerName, err := l.KLogField("containerName")
if err == nil && containerName != "" {
cs.RecordEvent(resourcepath.Container(podNamespace, podName, containerName))
cs.RecordLogSummary(fmt.Sprintf("%s【%s】", summary, toReadableContainerName(podNamespace, podName, containerName)))
} else {
cs.RecordEvent(resourcepath.Pod(podNamespace, podName))
cs.RecordLogSummary(fmt.Sprintf("%s【%s】", summary, toReadablePodSandboxName(podNamespace, podName)))
}
}
}
return nil
}
func parseDefaultSummary(l *log.LogEntity) (string, error) {
subinfo := ""
klogmain, err := l.KLogField("")
if err != nil {
return "", err
}
errorMsg, err := l.KLogField("error")
if err == nil && errorMsg != "" {
subinfo = fmt.Sprintf("error=%s", errorMsg)
}
probeType, err := l.KLogField("probeType")
if err == nil && probeType != "" {
subinfo = fmt.Sprintf("probeType=%s", probeType)
}
eventMsg, err := l.KLogField("event")
if err == nil && eventMsg != "" {
if eventMsg[0] == '&' || eventMsg[0] == '{' {
if strings.Contains(eventMsg, "Type:") {
subinfo = strings.Split(strings.Split(eventMsg, "Type:")[1], " ")[0]
}
} else {
subinfo = eventMsg
}
}
klogstatus, err := l.KLogField("status")
if err == nil && klogstatus != "" {
subinfo = fmt.Sprintf("status=%s", klogstatus)
}
klogExitCode, err := l.KLogField("exitCode")
if err == nil && klogExitCode != "" {
subinfo = fmt.Sprintf("exitCode=%s", klogExitCode)
}
klogGracePeriod, err := l.KLogField("gracePeriod")
if err == nil && klogGracePeriod != "" {
subinfo = fmt.Sprintf("gracePeriod=%ss", klogGracePeriod)
}
if subinfo == "" {
return klogmain, nil
} else {
return fmt.Sprintf("%s(%s)", klogmain, subinfo), nil
}
}
func (*k8sNodeParser) handleContainerdSandboxLogs(ctx context.Context, l *log.LogEntity, nodeName string, mainMessage string, builder *history.Builder, cs *history.ChangeSet, summary string) error {
// Pod sandbox related logs
if strings.HasPrefix(mainMessage, "RunPodSandbox") {
podSandbox, err := parseRunPodSandboxLog(mainMessage)
if err != nil {
return err
}
if podSandbox.PodSandboxID != "" {
builder.ClusterResource.NodeResourceLogBinder.AddResourceBinding(nodeName, noderesource.NewPodResourceBinding(
podSandbox.PodSandboxID,
podSandbox.PodNamespace,
podSandbox.PodName,
))
}
return nil
}
// Container related logs
if strings.HasPrefix(mainMessage, "CreateContainer") {
container, err := parseCreateContainerLog(mainMessage)
if err != nil {
return err
}
if container.ContainerID == "" {
slog.DebugContext(ctx, fmt.Sprintf("container ID is empty string for container %s. This is ignored because it would be kube-proxy container.", container.ContainerName), logger.LogKind("empty-container-id"))
return nil
}
if container.ContainerName == "" {
slog.WarnContext(ctx, fmt.Sprintf("container name is empty for pod sandbox id %s", container.PodSandboxID), logger.LogKind("empty-container-name"))
return nil
}
bindingsForPodSandboxID := builder.ClusterResource.NodeResourceLogBinder.GetBoundResourcesForLogBody(nodeName, container.PodSandboxID)
if len(bindingsForPodSandboxID) == 0 {
slog.DebugContext(ctx, fmt.Sprintf("pod sandbox %s was not found. It would be created before the log query start time", container.PodSandboxID), logger.LogKind("pod-sandbox-not-found"))
return nil
}
if len(bindingsForPodSandboxID) > 1 {
return fmt.Errorf("multiple pod sandboxes were found associated to pod sandbox id %s. This is unexpected behavior. Please check the log", container.PodSandboxID)
}
podResourceBinding, casted := bindingsForPodSandboxID[0].(*noderesource.PodResourceBinding)
if !casted {
return fmt.Errorf("pod sandbox ID %s is not associated with a PodResourceBinding reference. %v was given", container.PodSandboxID, bindingsForPodSandboxID[0])
}
containerResourceBinding := podResourceBinding.NewContainerResourceBinding(container.ContainerID, container.ContainerName)
builder.ClusterResource.NodeResourceLogBinder.AddResourceBinding(nodeName, containerResourceBinding)
return nil
}
return nil
}
type runPodSandboxLog struct {
PodName string
PodNamespace string
PodSandboxID string
}
func parseRunPodSandboxLog(msg string) (*runPodSandboxLog, error) {
// RunPodSandbox for &PodSandboxMetadata{Name:podname,Uid:b86b49f2431d244c613996c6472eb864,Namespace:kube-system,Attempt:0,} returns sandbox id \"6123c6aacf0c78dc38ec4f0ff72edd3cf04eb82ca0e3e7dddd3950ea9753bdf1\"
fields := readGoStructFromString(msg, "PodSandboxMetadata")
sandboxID := ""
splitted := strings.Split(msg, "returns sandbox id")
if len(splitted) >= 2 {
sandboxID = readNextQuotedString(splitted[1])
}
if fields["Name"] != "" && fields["Namespace"] != "" {
return &runPodSandboxLog{
PodName: fields["Name"],
PodNamespace: fields["Namespace"],
PodSandboxID: sandboxID,
}, nil
}
return nil, fmt.Errorf("not matched. igoreing")
}
type createContainerLog struct {
ContainerID string
ContainerName string
PodSandboxID string
}
func parseCreateContainerLog(msg string) (*createContainerLog, error) {
fields := readGoStructFromString(msg, "ContainerMetadata")
sandboxID := ""
splitted := strings.Split(msg, "within sandbox")
if len(splitted) < 2 {
return nil, fmt.Errorf("failed to read the sandbox Id from container starting log")
}
sandboxID = readNextQuotedString(splitted[1])
containerID := ""
splitted = strings.Split(msg, "returns container id")
if len(splitted) >= 2 {
containerID = readNextQuotedString(splitted[1])
}
if fields["Name"] != "" {
return &createContainerLog{
PodSandboxID: sandboxID,
ContainerName: fields["Name"],
ContainerID: containerID,
}, nil
}
return nil, fmt.Errorf("not matched. ignoreing")
}
// Find the struct part of specific structName in given string and returns fields.
func readGoStructFromString(message string, structName string) map[string]string {
splitted := strings.Split(message, structName)
if len(splitted) > 1 {
laterPart := splitted[1]
if len(laterPart) == 0 {
return map[string]string{}
}
if laterPart[0] == '{' {
laterPart = laterPart[1:]
}
structPart := strings.Split(laterPart, "}")[0]
fields := strings.Split(structPart, ",")
result := map[string]string{}
for _, field := range fields {
keyValue := strings.Split(field, ":")
if len(keyValue) == 2 {
result[keyValue[0]] = keyValue[1]
}
}
return result
}
return map[string]string{}
}
func readNextQuotedString(msg string) string {
splitted := strings.Split(msg, "\"")
if len(splitted) > 2 {
return splitted[1]
} else {
return ""
}
}
func toReadablePodSandboxName(namespace string, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
func toReadableContainerName(namespace string, name string, container string) string {
return fmt.Sprintf("%s in %s/%s", container, namespace, name)
}
var _ parser.Parser = (*k8sNodeParser)(nil)