pkg/monitor/cache.go (46 lines of code) (raw):
package monitor
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"math/rand"
"time"
"github.com/Azure/ARO-RP/pkg/api"
)
type cacheDoc struct {
doc *api.OpenShiftClusterDocument
stop chan<- struct{}
}
// deleteDoc deletes the given document from mon.docs, signalling the associated
// monitoring goroutine to stop if it exists. Caller must hold mon.mu.Lock.
func (mon *monitor) deleteDoc(doc *api.OpenShiftClusterDocument) {
v := mon.docs[doc.ID]
if v != nil {
if v.stop != nil {
close(mon.docs[doc.ID].stop)
}
delete(mon.docs, doc.ID)
}
}
// upsertDoc inserts or updates the given document into mon.docs, starting an
// associated monitoring goroutine if the document is in a bucket owned by us.
// Caller must hold mon.mu.Lock.
func (mon *monitor) upsertDoc(doc *api.OpenShiftClusterDocument) {
v := mon.docs[doc.ID]
if v == nil {
v = &cacheDoc{}
mon.docs[doc.ID] = v
}
v.doc = doc
mon.fixDoc(doc)
}
// fixDocs ensures that there is a monitoring goroutine for all documents in all
// buckets owned by us. Caller must hold mon.mu.Lock.
func (mon *monitor) fixDocs() {
for _, v := range mon.docs {
mon.fixDoc(v.doc)
}
}
// fixDoc ensures that there is a monitoring goroutine for the given document
// iff it is in a bucket owned by us. Caller must hold mon.mu.Lock.
func (mon *monitor) fixDoc(doc *api.OpenShiftClusterDocument) {
v := mon.docs[doc.ID]
_, ours := mon.buckets[v.doc.Bucket]
if !ours && v.stop != nil {
close(v.stop)
v.stop = nil
} else if ours && v.stop == nil {
ch := make(chan struct{})
v.stop = ch
delay := time.Duration(rand.Intn(60)) * time.Second
go mon.worker(ch, delay, doc.ID)
}
}