monitoring/report/buffer/reporter.go (79 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package buffer import ( "encoding/json" "fmt" "net/http" "sync" "time" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" ) // reporter is a struct that will fill a ring buffer for each monitored registry. type reporter struct { config wg sync.WaitGroup done chan struct{} registries map[string]*monitoring.Registry // ring buffers for namespaces entries map[string]*ringBuffer } // MakeReporter creates and starts a reporter with the given config. func MakeReporter(cfg *c.C) (*reporter, error) { config := defaultConfig() if cfg != nil { if err := cfg.Unpack(&config); err != nil { return nil, err } } r := &reporter{ config: config, done: make(chan struct{}), registries: map[string]*monitoring.Registry{}, entries: map[string]*ringBuffer{}, } for _, ns := range r.config.Namespaces { reg := monitoring.GetNamespace(ns).GetRegistry() r.registries[ns] = reg r.entries[ns] = newBuffer(r.config.Size) } r.wg.Add(1) go func() { defer r.wg.Done() r.snapshotLoop() }() return r, nil } // Stop will stop the reporter from collecting new information. // It will not clear any previously collected data. func (r *reporter) Stop() { close(r.done) r.wg.Wait() } // snapshotLoop will collect a snapshot for each monitored registry for the configured period and store them in the correct buffer. func (r *reporter) snapshotLoop() { ticker := time.NewTicker(r.config.Period) defer ticker.Stop() for { var ts time.Time select { case <-r.done: return case ts = <-ticker.C: } for name, reg := range r.registries { snap := monitoring.CollectStructSnapshot(reg, monitoring.Full, false) if _, ok := snap["@timestamp"]; !ok { snap["@timestamp"] = ts.UTC() } r.entries[name].add(snap) } } } // ServeHTTP is an http.Handler that will respond with the monitored registries buffer's contents in JSON. func (r *reporter) ServeHTTP(w http.ResponseWriter, req *http.Request) { resp := make(map[string][]interface{}, len(r.entries)) for name, entries := range r.entries { resp[name] = entries.getAll() } p, err := json.Marshal(resp) if err != nil { w.WriteHeader(500) fmt.Fprintf(w, "Unable to encode JSON response: %v", err) return } w.Header().Set("Content-Type", "application/json; charset=utf-8") _, _ = w.Write(p) }