collector/logs/sources/tail/pod_discovery.go (197 lines of code) (raw):
package tail
import (
"context"
"fmt"
"sync"
"time"
"github.com/Azure/adx-mon/pkg/k8s"
"github.com/Azure/adx-mon/pkg/logger"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
type currenttarget struct {
CurrentTarget FileTailTarget
UpdateChan chan<- FileTailTarget
ExpireTime time.Time
}
type TailerSourceInterface interface {
AddTarget(target FileTailTarget, updateChan <-chan FileTailTarget) error
RemoveTarget(target string)
}
type PodDiscoveryOpts struct {
NodeName string
PodInformer *k8s.PodInformer
StaticPodTargets []*StaticPodTargets
}
type StaticPodTargets struct {
Namespace string
Name string
Labels map[string]string
Parsers []string
Destination string
}
type PodDiscovery struct {
NodeName string
staticPodTargets []*StaticPodTargets
nowFunc func() time.Time
tailsource TailerSourceInterface
// pod uid -> filename -> target
podidToTargets map[string]map[string]*currenttarget
mut sync.Mutex // protect podidToTargets
cancel context.CancelFunc
podInformer *k8s.PodInformer
informerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
}
func NewPodDiscovery(opts PodDiscoveryOpts, tailsource TailerSourceInterface) *PodDiscovery {
return &PodDiscovery{
nowFunc: time.Now,
NodeName: opts.NodeName,
podInformer: opts.PodInformer,
tailsource: tailsource,
podidToTargets: make(map[string]map[string]*currenttarget),
staticPodTargets: opts.StaticPodTargets,
}
}
func (i *PodDiscovery) Open(ctx context.Context) error {
logger.Infof("Starting logging scraper for node %s", i.NodeName)
ctx, cancelFn := context.WithCancel(ctx)
i.cancel = cancelFn
var err error
if i.informerRegistration, err = i.podInformer.Add(ctx, i); err != nil {
return err
}
i.wg.Add(1)
go func() {
defer i.wg.Done()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
i.cleanTombstonedTargets()
}
}
}()
return nil
}
func (i *PodDiscovery) Close() error {
i.cancel()
i.podInformer.Remove(i.informerRegistration)
i.informerRegistration = nil
i.wg.Wait()
return nil
}
// OnAdd is called when an object is added. Expects a pod.
func (i *PodDiscovery) OnAdd(obj interface{}, isInitialList bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
targets := getFileTargets(pod, i.NodeName, i.staticPodTargets)
i.mut.Lock()
defer i.mut.Unlock()
existingTargets, hasExistingTargets := i.podidToTargets[string(pod.UID)]
if !hasExistingTargets && len(targets) == 0 {
// Nothing to update or keep track of.
return
}
if !hasExistingTargets {
existingTargets = map[string]*currenttarget{}
i.podidToTargets[string(pod.UID)] = existingTargets
}
// Update or add targets
for _, target := range targets {
existingCurrentTarget, ok := existingTargets[target.FilePath]
if ok {
// We're already tailing this target. Check if we need to update the database or table.
if isTargetChanged(existingCurrentTarget.CurrentTarget, target) || !existingCurrentTarget.ExpireTime.IsZero() {
i.updateTarget(target, existingCurrentTarget)
}
} else {
// We're not tailing this target yet. Add it.
err := i.startTailing(target, existingTargets)
if err != nil {
logger.Errorf("failed to start tailing %q: %s", target.FilePath, err)
}
}
}
// Remove old targets
for existingPath, existingTarget := range existingTargets {
found := false
for _, target := range targets {
if target.FilePath == existingPath {
found = true
break
}
}
if !found {
i.tombstoneTarget(existingTarget)
}
}
}
// OnUpdate is called when an object is updated. Expects a pod.
func (i *PodDiscovery) OnUpdate(oldObj, newObj interface{}) {
p, ok := newObj.(*v1.Pod)
if !ok || p == nil {
return
}
if p.DeletionTimestamp != nil {
i.OnDelete(p)
} else {
i.OnAdd(p, false)
}
}
// OnDelete is called when an object is deleted. Expects a pod.
func (i *PodDiscovery) OnDelete(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
i.mut.Lock()
defer i.mut.Unlock()
existingTargets, hasExistingTargets := i.podidToTargets[string(pod.UID)]
if !hasExistingTargets {
return
}
for _, existingTarget := range existingTargets {
i.tombstoneTarget(existingTarget)
}
}
func (i *PodDiscovery) startTailing(target FileTailTarget, existingTargets map[string]*currenttarget) error {
logger.Infof("Adding target %s", target.FilePath)
// We're not tailing this target yet. Add it.
updateChan := make(chan FileTailTarget, 1)
err := i.tailsource.AddTarget(target, updateChan)
if err != nil {
return fmt.Errorf("failed to start tailing: %w", err)
}
currenttarget := ¤ttarget{
CurrentTarget: target,
UpdateChan: updateChan,
}
existingTargets[target.FilePath] = currenttarget
return nil
}
func (i *PodDiscovery) updateTarget(target FileTailTarget, existingCurrentTarget *currenttarget) {
logger.Infof("Updating target %s", target.FilePath)
if !existingCurrentTarget.ExpireTime.IsZero() {
// No longer should be tombstoned
existingCurrentTarget.ExpireTime = time.Time{}
}
existingCurrentTarget.UpdateChan <- target
existingCurrentTarget.CurrentTarget = target
}
func (i *PodDiscovery) tombstoneTarget(existingCurrentTarget *currenttarget) {
// Set this target to be cleaned in the future. This is to allow some time for the target's logs
// to be written to disk by the container runtime and for us to consume it before we stop tailing it.
// We don't want to just tell tailsource to stop tailing when the file reaches EOF because the file
// may be written to again in the "future" and perhaps even rotated.
if existingCurrentTarget.ExpireTime.IsZero() {
logger.Infof("Tombstoning target %s", existingCurrentTarget.CurrentTarget.FilePath)
existingCurrentTarget.ExpireTime = i.nowFunc().Add(1 * time.Minute)
}
}
func (i *PodDiscovery) cleanTombstonedTargets() {
i.mut.Lock()
defer i.mut.Unlock()
for podID, existingTargets := range i.podidToTargets {
for path, existingTarget := range existingTargets {
if !existingTarget.ExpireTime.IsZero() && i.nowFunc().After(existingTarget.ExpireTime) {
logger.Infof("Removing tombstoned target %s", path)
i.tailsource.RemoveTarget(existingTarget.CurrentTarget.FilePath)
delete(existingTargets, path)
}
}
if len(existingTargets) == 0 {
delete(i.podidToTargets, podID)
}
}
}