metric.go (196 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package docappender
import (
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
type metrics struct {
bufferDuration metric.Float64Histogram
flushDuration metric.Float64Histogram
bulkRequests metric.Int64Counter
docsAdded metric.Int64Counter
docsActive metric.Int64UpDownCounter
docsIndexed metric.Int64Counter
docsRetried metric.Int64Counter
bytesTotal metric.Int64Counter
bytesUncompressedTotal metric.Int64Counter
availableBulkRequests metric.Int64UpDownCounter
inflightBulkrequests metric.Int64UpDownCounter
activeCreated metric.Int64Counter
activeDestroyed metric.Int64Counter
blockedAdd metric.Int64Counter
}
type histogramMetric struct {
name string
description string
unit string
p *metric.Float64Histogram
}
type counterMetric struct {
name string
description string
unit string
p *metric.Int64Counter
}
type upDownCounterMetric struct {
name string
description string
unit string
p *metric.Int64UpDownCounter
}
func newMetrics(cfg Config) (metrics, error) {
if cfg.MeterProvider == nil {
cfg.MeterProvider = otel.GetMeterProvider()
}
meter := cfg.MeterProvider.Meter("github.com/elastic/go-docappender")
ms := metrics{}
histograms := []histogramMetric{
{
name: "elasticsearch.buffer.latency",
description: "The amount of time a document was buffered for, in seconds.",
unit: "s",
p: &ms.bufferDuration,
},
{
name: "elasticsearch.flushed.latency",
description: "The amount of time a _bulk request took, in seconds.",
unit: "s",
p: &ms.flushDuration,
},
}
for _, m := range histograms {
err := newFloat64Histogram(meter, m)
if err != nil {
return ms, err
}
}
counters := []counterMetric{
{
name: "elasticsearch.bulk_requests.count",
description: "The number of bulk requests completed.",
p: &ms.bulkRequests,
},
{
name: "elasticsearch.events.count",
description: "Number of APM Events received for indexing",
p: &ms.docsAdded,
},
{
name: "elasticsearch.events.processed",
description: "Number of APM Events flushed to Elasticsearch. Attributes are used to report separate counts for different outcomes - success, client failure, etc.",
p: &ms.docsIndexed,
},
{
name: "elasticsearch.events.retried",
description: "The number of document retries. A single document may be retried more than once.",
p: &ms.docsRetried,
},
{
name: "elasticsearch.flushed.bytes",
description: "The total number of bytes written to the request body",
unit: "by",
p: &ms.bytesTotal,
},
{
name: "elasticsearch.flushed.uncompressed.bytes",
description: "The total number of uncompressed bytes written to the request body",
unit: "by",
p: &ms.bytesUncompressedTotal,
},
{
name: "elasticsearch.indexer.created",
description: "The number of active indexer creations.",
p: &ms.activeCreated,
},
{
name: "elasticsearch.indexer.destroyed",
description: "The number of times an active indexer was destroyed.",
p: &ms.activeDestroyed,
},
{
name: "docappender.blocked.add",
description: "The number of times Add could block due to exhausted capacity in bulkItems channel",
p: &ms.blockedAdd,
},
}
for _, m := range counters {
err := newInt64Counter(meter, m)
if err != nil {
return ms, err
}
}
upDownCounters := []upDownCounterMetric{
{
name: "elasticsearch.events.queued",
description: "the number of active items waiting in the indexer's queue.",
p: &ms.docsActive,
},
{
name: "elasticsearch.bulk_requests.available",
description: "The number of bulk indexers available for making bulk index requests.",
p: &ms.availableBulkRequests,
},
{
name: "elasticsearch.bulk_requests.inflight",
description: "The number of in-flight bulk requests being made to Elasticsearch.",
p: &ms.inflightBulkrequests,
},
}
for _, m := range upDownCounters {
err := newInt64UpDownCounter(meter, m)
if err != nil {
return ms, err
}
}
return ms, nil
}
func newInt64Counter(meter metric.Meter, c counterMetric) error {
unit := c.unit
if unit == "" {
unit = "1"
}
m, err := meter.Int64Counter(
c.name,
metric.WithUnit(unit),
metric.WithDescription(c.description),
)
if err != nil {
return fmt.Errorf(
"failed creating %s metric: %w", c.name, err,
)
}
*c.p = m
return nil
}
func newInt64UpDownCounter(meter metric.Meter, c upDownCounterMetric) error {
unit := c.unit
if unit == "" {
unit = "1"
}
m, err := meter.Int64UpDownCounter(
c.name,
metric.WithUnit(unit),
metric.WithDescription(c.description),
)
if err != nil {
return fmt.Errorf(
"failed creating %s metric: %w", c.name, err,
)
}
*c.p = m
return nil
}
func newFloat64Histogram(meter metric.Meter, h histogramMetric) error {
m, err := meter.Float64Histogram(
h.name,
metric.WithUnit(h.unit),
metric.WithDescription(h.description),
)
if err != nil {
return fmt.Errorf(
"failed creating %s metric: %w", h.name, err,
)
}
*h.p = m
return nil
}