func()

in pkg/source/gcp/task/gke/network_api/parser.go [67:152]


func (*gceNetworkParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
	isFirst := l.Has("operation.first")
	isLast := l.Has("operation.last")
	operationId := l.GetStringOrDefault("operation.id", "unknown")
	methodName := l.GetStringOrDefault("protoPayload.methodName", "unknown")
	methodNameSplitted := strings.Split(methodName, ".")
	resourceName := l.GetStringOrDefault("protoPayload.resourceName", "unknown")
	resourceNameSplitted := strings.Split(resourceName, "/")
	negName := resourceNameSplitted[len(resourceNameSplitted)-1]
	principal := l.GetStringOrDefault("protoPayload.authenticationInfo.principalEmail", "unknown")
	var negResourcePath resourcepath.ResourcePath
	lease, err := builder.ClusterResource.NEGs.GetResourceLeaseHolderAt(negName, l.Timestamp())
	if err == nil {
		negResourcePath = resourcepath.NetworkEndpointGroup(lease.Holder.Namespace, negName)
	} else {
		negResourcePath = resourcepath.NetworkEndpointGroup("unknown", negName)
	}

	switch {
	case !(isLast && isFirst) && (isLast || isFirst):
		state := enum.RevisionStateOperationStarted
		if isLast {
			state = enum.RevisionStateOperationFinished
		}
		operationPath := resourcepath.Operation(negResourcePath, methodNameSplitted[len(methodNameSplitted)-1], operationId)
		cs.RecordRevision(operationPath, &history.StagingResourceRevision{
			Verb:       enum.RevisionVerbCreate,
			State:      state,
			Requestor:  principal,
			ChangeTime: l.Timestamp(),
			Partial:    false,
		})
	default:
		cs.RecordEvent(negResourcePath)
	}
	if isFirst {
		method := methodNameSplitted[len(methodNameSplitted)-1]
		if method == "detachNetworkEndpoints" || method == "attachNetworkEndpoints" {
			isDetach := strings.HasPrefix(method, "detach")
			requestBody, err := l.GetChildYamlOf("protoPayload.request")
			if err != nil {
				return err
			}
			var negRequest NegAttachOrDetachRequest
			err = yaml.Unmarshal([]byte(requestBody), &negRequest)
			if err != nil {
				return err
			}
			for _, endpoint := range negRequest.NetworkEndpoints {
				lease, err := builder.ClusterResource.IPs.GetResourceLeaseHolderAt(endpoint.IpAddress, l.Timestamp())
				if err != nil {
					slog.WarnContext(ctx, fmt.Sprintf("Failed to identify the holder of the IP %s.\n This might be because the IP holder resource wasn't updated during the log period ", endpoint.IpAddress))
					continue
				}
				holder := lease.Holder
				if holder.Kind == "pod" {
					podPath := resourcepath.Pod(holder.Namespace, holder.Name)
					negSubresourcePath := resourcepath.NetworkEndpointGroupUnderResource(podPath, holder.Namespace, negName)
					state := enum.RevisionStateConditionTrue
					verb := enum.RevisionVerbReady
					if isDetach {
						state = enum.RevisionStateConditionFalse
						verb = enum.RevisionVerbNonReady
					}
					cs.RecordRevision(negSubresourcePath, &history.StagingResourceRevision{
						Verb:       verb,
						State:      state,
						Requestor:  principal,
						ChangeTime: l.Timestamp(),
						Partial:    false,
					})
				}
			}
		}
	}
	switch {
	case isFirst && !isLast:
		cs.RecordLogSummary(fmt.Sprintf("%s Started", methodName))
	case !isFirst && isLast:
		cs.RecordLogSummary(fmt.Sprintf("%s Finished", methodName))
	default:
		cs.RecordLogSummary(methodName)
	}

	return nil
}