pipeline/sources/heartbeat.go (66 lines of code) (raw):

// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package sources import ( "sync" "time" "github.com/GoogleCloudPlatform/ubbagent/clock" "github.com/GoogleCloudPlatform/ubbagent/config" "github.com/GoogleCloudPlatform/ubbagent/metrics" "github.com/GoogleCloudPlatform/ubbagent/pipeline" "github.com/golang/glog" ) type heartbeat struct { hb config.Heartbeat input pipeline.Input clock clock.Clock close chan bool wait sync.WaitGroup sdOnce sync.Once } func (h *heartbeat) Shutdown() (err error) { h.sdOnce.Do(func() { h.close <- true h.wait.Wait() err = h.input.Release() }) return } func (h *heartbeat) run(start time.Time) { interval := time.Duration(h.hb.IntervalSeconds) * time.Second end := start.Add(interval) running := true for running { now := h.clock.Now() nextFire := now.Add(end.Sub(now)) timer := h.clock.NewTimerAt(nextFire) select { case <-timer.GetC(): report := metrics.MetricReport{ Name: h.hb.Metric, StartTime: start, EndTime: end, Value: h.hb.Value, Labels: h.hb.Labels, } err := h.input.AddReport(report.Copy()) if err != nil { glog.Errorf("heartbeat: error sending report: %+v", err) } start = end end = end.Add(interval) case <-h.close: running = false } timer.Stop() } h.wait.Done() } func newHeartbeat(hb config.Heartbeat, input pipeline.Input, clock clock.Clock) pipeline.Source { input.Use() c := &heartbeat{hb: hb, input: input, clock: clock, close: make(chan bool, 1)} c.wait.Add(1) go c.run(clock.Now().UTC().Round(1 * time.Second)) return c } func NewHeartbeat(hb config.Heartbeat, input pipeline.Input) pipeline.Source { return newHeartbeat(hb, input, clock.NewClock()) }