in pkg/source/gcp/task/gke/k8s_node/parser.go [86:283]
func (p *k8sNodeParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
if !l.HasKLogField("") {
mainMessage, err := l.MainMessage()
if err != nil {
return err
}
cs.RecordLogSummary(mainMessage)
return nil
}
nodeName := l.GetStringOrDefault("resource.labels.node_name", "")
if nodeName == "" {
return fmt.Errorf("parser couldn't lookup the node name")
}
summary, err := parseDefaultSummary(l)
if err != nil {
return err
}
cs.RecordLogSummary(summary)
severity := enum.SeverityUnknown
mainMessage, err := l.MainMessage()
if err == nil {
severity = k8s.ExractKLogSeverity(mainMessage)
}
cs.RecordLogSeverity(severity)
supportsLifetimeParse := false
syslogIdentifier := p.GetSyslogIdentifier(l)
nodeComponentPath := resourcepath.NodeComponent(nodeName, syslogIdentifier)
if syslogIdentifier == "Unknown" {
// Check if the log is for kube-proxy. If it was true, the log event will be generated on the Pod resource.
logName := l.GetStringOrDefault("logName", "")
if strings.HasSuffix(logName, "kube-proxy") {
kubeProxyPodPath := resourcepath.Pod("kube-system", fmt.Sprintf("kube-proxy-%s", nodeName))
cs.RecordEvent(kubeProxyPodPath)
return nil
}
}
if syslogIdentifier == "containerd" {
msg, err := l.KLogField("msg")
if err != nil {
return err
}
err = p.handleContainerdSandboxLogs(ctx, l, nodeName, msg, builder, cs, summary)
if err != nil {
return err
}
if msg == ContainerdStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "dockerd" {
msg, err := l.KLogField("msg")
if err != nil {
return err
}
if msg == DockerdStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
if msg == DockerdTerminatingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbDelete,
State: enum.RevisionStateDeleted,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "configure.sh" {
msg, err := l.KLogField("")
if err != nil {
return err
}
if msg == ConfigureShStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
if msg == ConfigureShTerminatingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbDelete,
State: enum.RevisionStateDeleted,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "configure-helper.sh" {
msg, err := l.KLogField("")
if err != nil {
return err
}
if msg == ConfigureHelperShStartingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateExisting,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
if msg == ConfigureHelperShTerminatingMsg {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbDelete,
State: enum.RevisionStateDeleted,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
supportsLifetimeParse = true
}
if syslogIdentifier == "kubelet" {
klogExitCode, err := l.KLogField("exitCode")
if err == nil && klogExitCode != "" && klogExitCode != "0" {
if klogExitCode == "137" {
cs.RecordLogSeverity(enum.SeverityError)
} else {
cs.RecordLogSeverity(enum.SeverityWarning)
}
}
}
// Add inferred revision at the beginning when parse logics written before is not supporting lifetime visualization
if !supportsLifetimeParse {
tb := builder.GetTimelineBuilder(nodeComponentPath.Path)
if tb.GetLatestRevision() == nil {
cs.RecordRevision(nodeComponentPath,
&history.StagingResourceRevision{
Verb: enum.RevisionVerbCreate,
State: enum.RevisionStateInferred,
Requestor: syslogIdentifier,
ChangeTime: l.Timestamp(),
})
}
}
cs.RecordEvent(nodeComponentPath)
klognode, err := l.KLogField("node")
if err == nil && klognode != "" {
cs.RecordEvent(resourcepath.Node(klognode))
}
resourceBindings := builder.ClusterResource.NodeResourceLogBinder.GetBoundResourcesForLogBody(nodeName, mainMessage)
for _, rb := range resourceBindings {
cs.RecordEvent(rb.GetResourcePath())
summary = rb.RewriteLogSummary(summary)
}
if len(resourceBindings) > 0 {
cs.RecordLogSummary(summary)
} else {
// When this log can't be associated with resource by container id or pod sandbox id, try to get it from klog fields.
podNameWithNamespace, err := l.KLogField("pod")
if err == nil && podNameWithNamespace != "" {
podNameSplitted := strings.Split(podNameWithNamespace, "/")
podNamespace := "unknown"
podName := "unknown"
if len(podNameSplitted) >= 2 {
podNamespace = podNameSplitted[0]
podName = podNameSplitted[1]
}
containerName, err := l.KLogField("containerName")
if err == nil && containerName != "" {
cs.RecordEvent(resourcepath.Container(podNamespace, podName, containerName))
cs.RecordLogSummary(fmt.Sprintf("%s【%s】", summary, toReadableContainerName(podNamespace, podName, containerName)))
} else {
cs.RecordEvent(resourcepath.Pod(podNamespace, podName))
cs.RecordLogSummary(fmt.Sprintf("%s【%s】", summary, toReadablePodSandboxName(podNamespace, podName)))
}
}
}
return nil
}