in pkg/esoutput/esoutput.go [230:261]
func (o *Output) flush() {
samplesContainers := o.GetBufferedSamples()
for _, samplesContainer := range samplesContainers {
samples := samplesContainer.GetSamples()
for _, sample := range samples {
mappedEntry := elasticMetricEntry{
MetricName: sample.Metric.Name,
MetricType: sample.Metric.Type.String(),
Value: sample.Value,
Tags: sample.GetTags().Map(),
Time: sample.Time,
}
data, err := json.Marshal(mappedEntry)
if err != nil {
o.logger.Fatalf("Cannot encode document: %s, %s", err, mappedEntry)
}
var item = esutil.BulkIndexerItem{
Action: "create",
Body: bytes.NewReader(data),
OnFailure: o.blkItemErrHandler,
}
err = o.bulkIndexer.Add(
context.Background(),
item,
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
}
}
}