func()

in collector/logs/sources/tail/pod_discovery.go [105:157]


func (i *PodDiscovery) OnAdd(obj interface{}, isInitialList bool) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		return
	}

	targets := getFileTargets(pod, i.NodeName, i.staticPodTargets)

	i.mut.Lock()
	defer i.mut.Unlock()
	existingTargets, hasExistingTargets := i.podidToTargets[string(pod.UID)]
	if !hasExistingTargets && len(targets) == 0 {
		// Nothing to update or keep track of.
		return
	}

	if !hasExistingTargets {
		existingTargets = map[string]*currenttarget{}
		i.podidToTargets[string(pod.UID)] = existingTargets
	}

	// Update or add targets
	for _, target := range targets {
		existingCurrentTarget, ok := existingTargets[target.FilePath]
		if ok {
			// We're already tailing this target. Check if we need to update the database or table.
			if isTargetChanged(existingCurrentTarget.CurrentTarget, target) || !existingCurrentTarget.ExpireTime.IsZero() {
				i.updateTarget(target, existingCurrentTarget)
			}
		} else {
			// We're not tailing this target yet. Add it.
			err := i.startTailing(target, existingTargets)
			if err != nil {
				logger.Errorf("failed to start tailing %q: %s", target.FilePath, err)
			}
		}
	}

	// Remove old targets
	for existingPath, existingTarget := range existingTargets {
		found := false
		for _, target := range targets {
			if target.FilePath == existingPath {
				found = true
				break
			}
		}
		if !found {
			i.tombstoneTarget(existingTarget)
		}
	}

}