func()

in extension/observer/k8sobserver/handler.go [70:167]


func (h *handler) OnUpdate(oldObjectInterface, newObjectInterface any) {
	oldEndpoints := map[observer.EndpointID]observer.Endpoint{}
	newEndpoints := map[observer.EndpointID]observer.Endpoint{}

	switch oldObject := oldObjectInterface.(type) {
	case *v1.Pod:
		newPod, ok := newObjectInterface.(*v1.Pod)
		if !ok {
			h.logger.Warn("skip updating endpoint for pod as the update is of different type", zap.Any("oldPod", oldObjectInterface), zap.Any("newObject", newObjectInterface))
			return
		}
		for _, e := range convertPodToEndpoints(h.idNamespace, oldObject) {
			oldEndpoints[e.ID] = e
		}
		for _, e := range convertPodToEndpoints(h.idNamespace, newPod) {
			newEndpoints[e.ID] = e
		}

	case *v1.Service:
		newService, ok := newObjectInterface.(*v1.Service)
		if !ok {
			h.logger.Warn("skip updating endpoint for service as the update is of different type", zap.Any("oldService", oldObjectInterface), zap.Any("newObject", newObjectInterface))
			return
		}
		for _, e := range convertServiceToEndpoints(h.idNamespace, oldObject) {
			oldEndpoints[e.ID] = e
		}
		for _, e := range convertServiceToEndpoints(h.idNamespace, newService) {
			newEndpoints[e.ID] = e
		}

	case *networkingv1.Ingress:
		newIngress, ok := newObjectInterface.(*networkingv1.Ingress)
		if !ok {
			h.logger.Warn("skip updating endpoint for ingress as the update is of different type", zap.Any("oldIngress", oldObjectInterface), zap.Any("newObject", newObjectInterface))
			return
		}
		for _, e := range convertIngressToEndpoints(h.idNamespace, oldObject) {
			oldEndpoints[e.ID] = e
		}
		for _, e := range convertIngressToEndpoints(h.idNamespace, newIngress) {
			newEndpoints[e.ID] = e
		}

	case *v1.Node:
		newNode, ok := newObjectInterface.(*v1.Node)
		if !ok {
			h.logger.Warn("skip updating endpoint for node as the update is of different type", zap.Any("oldNode", oldObjectInterface), zap.Any("newObject", newObjectInterface))
			return
		}
		oldEndpoint := convertNodeToEndpoint(h.idNamespace, oldObject)
		oldEndpoints[oldEndpoint.ID] = oldEndpoint
		newEndpoint := convertNodeToEndpoint(h.idNamespace, newNode)
		newEndpoints[newEndpoint.ID] = newEndpoint
	default: // unsupported
		return
	}

	var removedEndpoints, updatedEndpoints, addedEndpoints []observer.Endpoint

	// Find endpoints that are present in oldPod and newPod and see if they've
	// changed. Otherwise if it wasn't in oldPod it's a new endpoint.
	for _, e := range newEndpoints {
		if existing, ok := oldEndpoints[e.ID]; ok {
			if !reflect.DeepEqual(existing, e) {
				updatedEndpoints = append(updatedEndpoints, e)
			}
		} else {
			addedEndpoints = append(addedEndpoints, e)
		}
	}

	// If an endpoint is present in the oldPod but not in the newPod then
	// send as removed.
	for _, e := range oldEndpoints {
		if _, ok := newEndpoints[e.ID]; !ok {
			removedEndpoints = append(removedEndpoints, e)
		}
	}

	if len(removedEndpoints) > 0 {
		for _, endpoint := range removedEndpoints {
			h.endpoints.Delete(endpoint.ID)
		}
	}

	if len(updatedEndpoints) > 0 {
		for _, endpoint := range updatedEndpoints {
			h.endpoints.Store(endpoint.ID, endpoint)
		}
	}

	if len(addedEndpoints) > 0 {
		for _, endpoint := range addedEndpoints {
			h.endpoints.Store(endpoint.ID, endpoint)
		}
	}
}