internal/pkg/policy/sub.go (89 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package policy import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" ) type subT struct { policyID string agentID string // not logically necessary; cached for logging revIdx int64 next *subT prev *subT ch chan *ParsedPolicy } func NewSub(policyID, agentID string, revIdx int64) *subT { return &subT{ policyID: policyID, agentID: agentID, revIdx: revIdx, ch: make(chan *ParsedPolicy, 1), } } func makeHead() *subT { sub := &subT{} sub.next = sub sub.prev = sub return sub } func (n *subT) pushFront(nn *subT) { nn.next = n.next nn.prev = n n.next.prev = nn n.next = nn } func (n *subT) pushBack(nn *subT) { nn.next = n nn.prev = n.prev n.prev.next = nn n.prev = nn } func (n *subT) popFront() *subT { if n.next == n { return nil } s := n.next s.unlink() return s } func (n *subT) unlink() bool { //nolint:unparam // useful to return this if we ever test if n.next == nil || n.prev == nil { return false } n.prev.next = n.next n.next.prev = n.prev n.next = nil n.prev = nil return true } func (n *subT) isEmpty() bool { return n.next == n } func (n *subT) isUpdate(policy *model.Policy) bool { pRevIdx := policy.RevisionIdx return pRevIdx > n.revIdx } // Output returns a new policy that needs to be sent based on the current subscription. func (n *subT) Output() <-chan *ParsedPolicy { return n.ch } type subIterT struct { head *subT cur *subT } func NewIterator(head *subT) *subIterT { return &subIterT{ head: head, cur: head, } } func (it *subIterT) Next() *subT { next := it.cur.next if next == it.head { return nil } it.cur = next return next } func (it *subIterT) Unlink() { prev := it.cur.prev it.cur.unlink() it.cur = prev }