func recordChangeSetForLog()

in pkg/source/common/k8s_audit/recorder/endpointslicerecorder/recorder.go [63:147]


func recordChangeSetForLog(ctx context.Context, log *types.AuditLogParserInput, prevEndpointSlices *model.EndpointSlice, cs *history.ChangeSet, builder *history.Builder) (*model.EndpointSlice, error) {
	var endpointSlice model.EndpointSlice
	err := log.ResourceBodyReader.ReadReflect("", &endpointSlice)
	if err != nil {
		return nil, err
	}
	relatedServiceName := ""
	if endpointSlice.Metadata != nil && endpointSlice.Metadata.OwnerReferences != nil {
		for _, owner := range endpointSlice.Metadata.OwnerReferences {
			if strings.ToLower(owner.Kind) == "service" {
				if relatedServiceName != "" {
					slog.WarnContext(ctx, fmt.Sprintf("multiple owners found for a single endpoint slice. ignoreing service %s", relatedServiceName))
				}
				relatedServiceName = owner.Name
			}
		}
	}
	isOwnedByService := relatedServiceName != ""

	if endpointSlice.Endpoints == nil {
		endpointSlice.Endpoints = make([]*model.EndpointSliceEndpoint, 0)
	}
	for _, endpoint := range endpointSlice.Endpoints {
		endpointParseResult, err := parseSingleEndpoint(ctx, endpoint, prevEndpointSlices)
		if err != nil {
			slog.WarnContext(ctx, fmt.Sprintf("failed to parse an endpoint\n%s", err.Error()))
		}

		// records Ips used in Pods. IP can be read from Pod manifest, but it can be ignored when users didn't turn on DATA_WRITE audit log, but endpoint slice update will be recorded always.
		if endpointParseResult.isEndpointForPod {
			for _, address := range endpoint.Addresses {
				builder.ClusterResource.IPs.TouchResourceLease(address, log.Log.Timestamp(), resourcelease.NewK8sResourceLeaseHolder(endpoint.TargetRef.Kind, endpoint.TargetRef.Namespace, endpoint.TargetRef.Name))
			}
		}

		// record conditions as subresource of pod.
		if endpointParseResult.hasCoditionChanged && endpointParseResult.isEndpointForPod {
			podEndpointSliceResourcePath := resourcepath.PodEndpointSlice(log.Operation.Namespace, log.Operation.Name, endpoint.TargetRef.Namespace, endpoint.TargetRef.Name)
			cs.RecordRevision(podEndpointSliceResourcePath, &history.StagingResourceRevision{
				Body:       endpointParseResult.manifest,
				State:      endpointParseResult.state,
				Verb:       endpointParseResult.verb,
				ChangeTime: log.Log.Timestamp(),
			})
		}
	}
	// find endpoints included in the previous revision but not in in the current revision
	if prevEndpointSlices != nil {
		for _, prevEndpoint := range prevEndpointSlices.Endpoints {
			var current *model.EndpointSliceEndpoint
			if prevEndpoint.TargetRef != nil {
				current = lookupEndpointFromUid(&endpointSlice, prevEndpoint.TargetRef.Uid)
			}
			if current != nil {
				continue
			}
			if prevEndpoint.TargetRef != nil {
				podEndpointSliceResourcePath := resourcepath.PodEndpointSlice(log.Operation.Namespace, log.Operation.Name, prevEndpoint.TargetRef.Namespace, prevEndpoint.TargetRef.Name)
				// Only process endpoints not included in current endpoint slices
				cs.RecordRevision(podEndpointSliceResourcePath, &history.StagingResourceRevision{
					Body:       "# This endpoint removed from endpoint list of the EndpointSlice",
					State:      enum.RevisionStateDeleted,
					Verb:       enum.RevisionVerbDelete,
					ChangeTime: log.Log.Timestamp(),
				})
			}
		}
	}

	if isOwnedByService {
		endpointsParseResults, err := parseEndpointsOfEndpointSlice(ctx, &endpointSlice)
		if err != nil {
			slog.WarnContext(ctx, fmt.Sprintf("failed to parse an endpoint\n%s", err.Error()))
		} else {
			serviceEndpointSliceResourcePath := resourcepath.ServiceEndpointSlice(log.Operation.Namespace, log.Operation.Name, relatedServiceName)
			cs.RecordRevision(serviceEndpointSliceResourcePath, &history.StagingResourceRevision{
				Body:       endpointsParseResults.manifest,
				State:      endpointsParseResults.state,
				Verb:       endpointsParseResults.verb,
				ChangeTime: log.Log.Timestamp(),
			})
		}
	}
	return &endpointSlice, nil
}