in collector/logs/sources/tail/pod_discovery.go [105:157]
func (i *PodDiscovery) OnAdd(obj interface{}, isInitialList bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
targets := getFileTargets(pod, i.NodeName, i.staticPodTargets)
i.mut.Lock()
defer i.mut.Unlock()
existingTargets, hasExistingTargets := i.podidToTargets[string(pod.UID)]
if !hasExistingTargets && len(targets) == 0 {
// Nothing to update or keep track of.
return
}
if !hasExistingTargets {
existingTargets = map[string]*currenttarget{}
i.podidToTargets[string(pod.UID)] = existingTargets
}
// Update or add targets
for _, target := range targets {
existingCurrentTarget, ok := existingTargets[target.FilePath]
if ok {
// We're already tailing this target. Check if we need to update the database or table.
if isTargetChanged(existingCurrentTarget.CurrentTarget, target) || !existingCurrentTarget.ExpireTime.IsZero() {
i.updateTarget(target, existingCurrentTarget)
}
} else {
// We're not tailing this target yet. Add it.
err := i.startTailing(target, existingTargets)
if err != nil {
logger.Errorf("failed to start tailing %q: %s", target.FilePath, err)
}
}
}
// Remove old targets
for existingPath, existingTarget := range existingTargets {
found := false
for _, target := range targets {
if target.FilePath == existingPath {
found = true
break
}
}
if !found {
i.tombstoneTarget(existingTarget)
}
}
}