pkg/source/gcp/task/gke/network_api/parser.go (122 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package network_api import ( "context" "fmt" "log/slog" "strings" "github.com/GoogleCloudPlatform/khi/pkg/log" "github.com/GoogleCloudPlatform/khi/pkg/model/enum" "github.com/GoogleCloudPlatform/khi/pkg/model/history" "github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper" "github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath" "github.com/GoogleCloudPlatform/khi/pkg/parser" "github.com/GoogleCloudPlatform/khi/pkg/source/gcp/inspectiontype" network_api_taskid "github.com/GoogleCloudPlatform/khi/pkg/source/gcp/task/gke/network_api/taskid" "github.com/GoogleCloudPlatform/khi/pkg/task/taskid" "gopkg.in/yaml.v3" ) type gceNetworkParser struct{} // TargetLogType implements parser.Parser. func (g *gceNetworkParser) TargetLogType() enum.LogType { return enum.LogTypeNetworkAPI } // Dependencies implements parser.Parser. func (*gceNetworkParser) Dependencies() []taskid.UntypedTaskReference { return []taskid.UntypedTaskReference{} } // Description implements parser.Parser. func (*gceNetworkParser) Description() string { return `Gather GCE Network API logs to visualize statuses of Network Endpoint Groups(NEG)` } // GetParserName implements parser.Parser. func (*gceNetworkParser) GetParserName() string { return "GCE Network Logs" } // LogTask implements parser.Parser. func (*gceNetworkParser) LogTask() taskid.TaskReference[[]*log.LogEntity] { return network_api_taskid.GCPNetworkLogQueryTaskID.Ref() } func (*gceNetworkParser) Grouper() grouper.LogGrouper { return grouper.AllDependentLogGrouper } // Parse implements parser.Parser. func (*gceNetworkParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error { isFirst := l.Has("operation.first") isLast := l.Has("operation.last") operationId := l.GetStringOrDefault("operation.id", "unknown") methodName := l.GetStringOrDefault("protoPayload.methodName", "unknown") methodNameSplitted := strings.Split(methodName, ".") resourceName := l.GetStringOrDefault("protoPayload.resourceName", "unknown") resourceNameSplitted := strings.Split(resourceName, "/") negName := resourceNameSplitted[len(resourceNameSplitted)-1] principal := l.GetStringOrDefault("protoPayload.authenticationInfo.principalEmail", "unknown") var negResourcePath resourcepath.ResourcePath lease, err := builder.ClusterResource.NEGs.GetResourceLeaseHolderAt(negName, l.Timestamp()) if err == nil { negResourcePath = resourcepath.NetworkEndpointGroup(lease.Holder.Namespace, negName) } else { negResourcePath = resourcepath.NetworkEndpointGroup("unknown", negName) } switch { case !(isLast && isFirst) && (isLast || isFirst): state := enum.RevisionStateOperationStarted if isLast { state = enum.RevisionStateOperationFinished } operationPath := resourcepath.Operation(negResourcePath, methodNameSplitted[len(methodNameSplitted)-1], operationId) cs.RecordRevision(operationPath, &history.StagingResourceRevision{ Verb: enum.RevisionVerbCreate, State: state, Requestor: principal, ChangeTime: l.Timestamp(), Partial: false, }) default: cs.RecordEvent(negResourcePath) } if isFirst { method := methodNameSplitted[len(methodNameSplitted)-1] if method == "detachNetworkEndpoints" || method == "attachNetworkEndpoints" { isDetach := strings.HasPrefix(method, "detach") requestBody, err := l.GetChildYamlOf("protoPayload.request") if err != nil { return err } var negRequest NegAttachOrDetachRequest err = yaml.Unmarshal([]byte(requestBody), &negRequest) if err != nil { return err } for _, endpoint := range negRequest.NetworkEndpoints { lease, err := builder.ClusterResource.IPs.GetResourceLeaseHolderAt(endpoint.IpAddress, l.Timestamp()) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("Failed to identify the holder of the IP %s.\n This might be because the IP holder resource wasn't updated during the log period ", endpoint.IpAddress)) continue } holder := lease.Holder if holder.Kind == "pod" { podPath := resourcepath.Pod(holder.Namespace, holder.Name) negSubresourcePath := resourcepath.NetworkEndpointGroupUnderResource(podPath, holder.Namespace, negName) state := enum.RevisionStateConditionTrue verb := enum.RevisionVerbReady if isDetach { state = enum.RevisionStateConditionFalse verb = enum.RevisionVerbNonReady } cs.RecordRevision(negSubresourcePath, &history.StagingResourceRevision{ Verb: verb, State: state, Requestor: principal, ChangeTime: l.Timestamp(), Partial: false, }) } } } } switch { case isFirst && !isLast: cs.RecordLogSummary(fmt.Sprintf("%s Started", methodName)) case !isFirst && isLast: cs.RecordLogSummary(fmt.Sprintf("%s Finished", methodName)) default: cs.RecordLogSummary(methodName) } return nil } var _ parser.Parser = (*gceNetworkParser)(nil) var NetowrkAPIParserTask = parser.NewParserTaskFromParser(network_api_taskid.GCPNetworkLogParserTaskID, &gceNetworkParser{}, true, inspectiontype.GKEBasedClusterInspectionTypes)