in pkg/source/common/k8s_audit/recorder/endpointslicerecorder/recorder.go [63:147]
func recordChangeSetForLog(ctx context.Context, log *types.AuditLogParserInput, prevEndpointSlices *model.EndpointSlice, cs *history.ChangeSet, builder *history.Builder) (*model.EndpointSlice, error) {
var endpointSlice model.EndpointSlice
err := log.ResourceBodyReader.ReadReflect("", &endpointSlice)
if err != nil {
return nil, err
}
relatedServiceName := ""
if endpointSlice.Metadata != nil && endpointSlice.Metadata.OwnerReferences != nil {
for _, owner := range endpointSlice.Metadata.OwnerReferences {
if strings.ToLower(owner.Kind) == "service" {
if relatedServiceName != "" {
slog.WarnContext(ctx, fmt.Sprintf("multiple owners found for a single endpoint slice. ignoreing service %s", relatedServiceName))
}
relatedServiceName = owner.Name
}
}
}
isOwnedByService := relatedServiceName != ""
if endpointSlice.Endpoints == nil {
endpointSlice.Endpoints = make([]*model.EndpointSliceEndpoint, 0)
}
for _, endpoint := range endpointSlice.Endpoints {
endpointParseResult, err := parseSingleEndpoint(ctx, endpoint, prevEndpointSlices)
if err != nil {
slog.WarnContext(ctx, fmt.Sprintf("failed to parse an endpoint\n%s", err.Error()))
}
// records Ips used in Pods. IP can be read from Pod manifest, but it can be ignored when users didn't turn on DATA_WRITE audit log, but endpoint slice update will be recorded always.
if endpointParseResult.isEndpointForPod {
for _, address := range endpoint.Addresses {
builder.ClusterResource.IPs.TouchResourceLease(address, log.Log.Timestamp(), resourcelease.NewK8sResourceLeaseHolder(endpoint.TargetRef.Kind, endpoint.TargetRef.Namespace, endpoint.TargetRef.Name))
}
}
// record conditions as subresource of pod.
if endpointParseResult.hasCoditionChanged && endpointParseResult.isEndpointForPod {
podEndpointSliceResourcePath := resourcepath.PodEndpointSlice(log.Operation.Namespace, log.Operation.Name, endpoint.TargetRef.Namespace, endpoint.TargetRef.Name)
cs.RecordRevision(podEndpointSliceResourcePath, &history.StagingResourceRevision{
Body: endpointParseResult.manifest,
State: endpointParseResult.state,
Verb: endpointParseResult.verb,
ChangeTime: log.Log.Timestamp(),
})
}
}
// find endpoints included in the previous revision but not in in the current revision
if prevEndpointSlices != nil {
for _, prevEndpoint := range prevEndpointSlices.Endpoints {
var current *model.EndpointSliceEndpoint
if prevEndpoint.TargetRef != nil {
current = lookupEndpointFromUid(&endpointSlice, prevEndpoint.TargetRef.Uid)
}
if current != nil {
continue
}
if prevEndpoint.TargetRef != nil {
podEndpointSliceResourcePath := resourcepath.PodEndpointSlice(log.Operation.Namespace, log.Operation.Name, prevEndpoint.TargetRef.Namespace, prevEndpoint.TargetRef.Name)
// Only process endpoints not included in current endpoint slices
cs.RecordRevision(podEndpointSliceResourcePath, &history.StagingResourceRevision{
Body: "# This endpoint removed from endpoint list of the EndpointSlice",
State: enum.RevisionStateDeleted,
Verb: enum.RevisionVerbDelete,
ChangeTime: log.Log.Timestamp(),
})
}
}
}
if isOwnedByService {
endpointsParseResults, err := parseEndpointsOfEndpointSlice(ctx, &endpointSlice)
if err != nil {
slog.WarnContext(ctx, fmt.Sprintf("failed to parse an endpoint\n%s", err.Error()))
} else {
serviceEndpointSliceResourcePath := resourcepath.ServiceEndpointSlice(log.Operation.Namespace, log.Operation.Name, relatedServiceName)
cs.RecordRevision(serviceEndpointSliceResourcePath, &history.StagingResourceRevision{
Body: endpointsParseResults.manifest,
State: endpointsParseResults.state,
Verb: endpointsParseResults.verb,
ChangeTime: log.Log.Timestamp(),
})
}
}
return &endpointSlice, nil
}