internal/agentcfg/reporter.go (78 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package agentcfg
import (
"context"
"sync"
"time"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/elastic-agent-libs/logp"
)
type Reporter struct {
f Fetcher
p modelpb.BatchProcessor
interval time.Duration
logger *logp.Logger
resultc chan Result
}
func NewReporter(f Fetcher, batchProcessor modelpb.BatchProcessor, interval time.Duration, logger *logp.Logger) Reporter {
return Reporter{
f: f,
p: batchProcessor,
interval: interval,
logger: logger.Named("agentcfg"),
resultc: make(chan Result),
}
}
func (r Reporter) Fetch(ctx context.Context, query Query) (Result, error) {
result, err := r.f.Fetch(ctx, query)
if err != nil {
return Result{}, err
}
// Only report configs when the query etag == current config etag, or
// when the agent indicates it has been applied.
if result.Source.Etag != EtagSentinel && (query.Etag == result.Source.Etag || query.MarkAsAppliedByAgent) {
select {
case <-ctx.Done():
return Result{}, ctx.Err()
case r.resultc <- result:
}
}
return result, err
}
func (r Reporter) Run(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()
// applied tracks the etags of agent config that has been applied.
applied := make(map[string]struct{})
t := time.NewTicker(r.interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case result := <-r.resultc:
if _, ok := applied[result.Source.Etag]; !ok {
applied[result.Source.Etag] = struct{}{}
}
continue
case <-t.C:
}
batch := make(modelpb.Batch, 0, len(applied))
for etag := range applied {
batch = append(batch, &modelpb.APMEvent{
Timestamp: modelpb.FromTime(time.Now()),
Labels: modelpb.Labels{"etag": {Value: etag}},
Metricset: &modelpb.Metricset{
Name: "agent_config",
Samples: []*modelpb.MetricsetSample{
{Name: "agent_config_applied", Value: 1},
},
},
})
}
// Reset applied map, so that we report only configs applied
// during a given iteration.
applied = make(map[string]struct{})
wg.Add(1)
go func() {
defer wg.Done()
if err := r.p.ProcessBatch(ctx, &batch); err != nil {
r.logger.Errorf("error sending applied agent configs to kibana: %v", err)
}
}()
}
}