ingestor/cluster/coordinator.go (270 lines of code) (raw):

package cluster import ( "context" "fmt" "net" "strings" "sync" "time" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/otlp" "github.com/Azure/adx-mon/pkg/prompb" "github.com/Azure/adx-mon/pkg/promremote" "github.com/Azure/adx-mon/pkg/service" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" v12 "k8s.io/client-go/listers/core/v1" ) type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error type OTLPLogsWriter func(ctx context.Context, database, table string, logs *otlp.Logs) error type Coordinator interface { MetricPartitioner service.Component // IsLeader returns true if the current node is the leader. IsLeader() bool } // Coordinator manages the cluster state and writes to the correct peer. type coordinator struct { opts *CoordinatorOpts namespace string mu sync.RWMutex peers map[string]string part *Partitioner pcli *promremote.Client factory informers.SharedInformerFactory tsw TimeSeriesWriter kcli kubernetes.Interface pl v12.PodLister hostEntpoint string hostname string groupName string cancel context.CancelFunc wg sync.WaitGroup leader bool } type CoordinatorOpts struct { WriteTimeSeriesFn TimeSeriesWriter K8sCli kubernetes.Interface // Namespace is the namespace used to discover peers. If not specified, the coordinator will // try to use the namespace of the current pod. Namespace string // Hostname is the hostname of the current node. This should be the statefulset hostname format // in order to discover peers correctly Hostname string // InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. InsecureSkipVerify bool // PartitionSize is the max size of the group of nodes forming a partition. A partition is a set of nodes where // keys are distributed. // NOTE: This is not used in the current implementation. PartitionSize int } func NewCoordinator(opts *CoordinatorOpts) (Coordinator, error) { ns := opts.Namespace groupName := opts.Hostname if ns == "" { logger.Warnf("No namespace found, peer discovery disabled") } else { logger.Infof("Using namespace %s for peer discovery", ns) if !strings.Contains(groupName, "-") { logger.Warnf("Hostname not in statefuleset format, peer discovery disabled") } else { rindex := strings.LastIndex(groupName, "-") groupName = groupName[:rindex] logger.Infof("Using statefuleset %s for peer discovery", groupName) } } return &coordinator{ groupName: groupName, hostname: opts.Hostname, namespace: ns, opts: opts, kcli: opts.K8sCli, }, nil } func (c *coordinator) OnAdd(obj interface{}, isInitialList bool) { if p, ok := obj.(*v1.Pod); !ok || !c.isPeer(p) { return } if err := c.syncPeers(); err != nil { logger.Errorf("Failed to reconfigure peers: %s", err) } } func (c *coordinator) OnUpdate(oldObj, newObj interface{}) { if p, ok := newObj.(*v1.Pod); !ok || !c.isPeer(p) { return } if err := c.syncPeers(); err != nil { logger.Errorf("Failed to reconfigure peers: %s", err) } } func (c *coordinator) OnDelete(obj interface{}) { if p, ok := obj.(*v1.Pod); !ok || !c.isPeer(p) { return } if err := c.syncPeers(); err != nil { logger.Errorf("Failed to reconfigure peers: %s", err) } } func (c *coordinator) isPeer(p *v1.Pod) bool { if p.Namespace != c.namespace { return false } var isPeer bool for _, ref := range p.OwnerReferences { if ref.Kind == "StatefulSet" && ref.Name == c.groupName { isPeer = true } } return isPeer } func (c *coordinator) Open(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) c.cancel = cancel tweakOptions := informers.WithTweakListOptions(func(lo *metav1.ListOptions) { lo.LabelSelector = "app=ingestor" }) factory := informers.NewSharedInformerFactoryWithOptions(c.kcli, time.Minute, tweakOptions, informers.WithNamespace(c.namespace)) podsInformer := factory.Core().V1().Pods().Informer() factory.Start(ctx.Done()) // Start processing these informers. factory.WaitForCacheSync(ctx.Done()) c.factory = factory pl := factory.Core().V1().Pods().Lister() c.pl = pl c.tsw = c.opts.WriteTimeSeriesFn myIP, err := GetOutboundIP() if err != nil { return fmt.Errorf("failed to determin ip: %w", err) } if myIP == nil || myIP.To4().String() == "" { return fmt.Errorf("failed to determine ip") } hostName := c.opts.Hostname hostEndpoint := fmt.Sprintf("https://%s:9090", myIP.To4().String()) c.hostEntpoint = hostEndpoint c.hostname = hostName set := make(map[string]string) set[c.hostname] = c.hostEntpoint c.peers = set c.setPartitioner(set) if _, err := podsInformer.AddEventHandler(c); err != nil { return err } if err := c.syncPeers(); err != nil { return err } c.wg.Add(1) go c.resyncPeers(ctx) return nil } func (c *coordinator) IsLeader() bool { c.mu.RLock() defer c.mu.RUnlock() return c.leader } func (c *coordinator) Owner(b []byte) (string, string) { c.mu.RLock() defer c.mu.RUnlock() return c.part.Owner(b) } func (c *coordinator) Close() error { c.cancel() c.wg.Wait() c.factory.Shutdown() return nil } func (c *coordinator) Write(ctx context.Context, wr *prompb.WriteRequest) error { return c.tsw(ctx, wr.Timeseries) } // syncPeers determines the active set of ingestors and reconfigures the partitioner. func (c *coordinator) syncPeers() error { c.mu.Lock() defer c.mu.Unlock() pods, err := c.pl.Pods(c.namespace).List(labels.Everything()) if err != nil { return fmt.Errorf("list pods: %w", err) } var leastNode string set := make(map[string]string, len(c.peers)) for _, p := range pods { if p.Status.PodIP == "" { continue } if !c.isPeer(p) { continue } ready := IsPodReady(p) if !ready { continue } set[p.Name] = fmt.Sprintf("https://%s:9090", p.Status.PodIP) if p.Name < leastNode || leastNode == "" { leastNode = p.Name } } c.leader = c.opts.Hostname == leastNode if len(set) == 0 { return nil } if err := c.setPartitioner(set); err != nil { return err } return nil } func (c *coordinator) setPartitioner(set map[string]string) error { c.peers = make(map[string]string, len(set)) for peer, addr := range set { c.peers[peer] = addr } part, err := NewPartition(set) if err != nil { return err } c.part = part return nil } func (c *coordinator) resyncPeers(ctx context.Context) { defer c.wg.Done() t := time.NewTicker(time.Minute) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: if err := c.syncPeers(); err != nil { logger.Errorf("Failed to reconfigure peers: %s", err) } if logger.IsDebug() { c.mu.RLock() for peer, addr := range c.peers { logger.Debugf("Peers updated %s addr=%s ready=%v", peer, addr, "true") } c.mu.RUnlock() } } } } // Get preferred outbound ip of this machine func GetOutboundIP() (net.IP, error) { conn, err := net.Dial("udp", "169.254.169.25:80") if err != nil { if strings.Contains(err.Error(), "network is unreachable") { return net.IPv4(127, 0, 0, 1), nil } return nil, err } defer conn.Close() localAddr := conn.LocalAddr().(*net.UDPAddr) return localAddr.IP, nil } // IsPodReady returns true if all containers in a pod are in a ready state func IsPodReady(pod *v1.Pod) bool { if !IsInitReady(pod) { return false } for _, cond := range pod.Status.Conditions { if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { return true } } return false } // IsInitReady returns true if all init containers are in a ready state func IsInitReady(pod *v1.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type == v1.PodInitialized && cond.Status == v1.ConditionTrue { return true } } return false }