receiver/flinkmetricsreceiver/internal/metadata/generated_metrics.go (1,893 lines of code) (raw):
// Code generated by mdatagen. DO NOT EDIT.
package metadata
import (
"fmt"
"strconv"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/filter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
)
// AttributeCheckpoint specifies the value checkpoint attribute.
type AttributeCheckpoint int
const (
_ AttributeCheckpoint = iota
AttributeCheckpointCompleted
AttributeCheckpointFailed
)
// String returns the string representation of the AttributeCheckpoint.
func (av AttributeCheckpoint) String() string {
switch av {
case AttributeCheckpointCompleted:
return "completed"
case AttributeCheckpointFailed:
return "failed"
}
return ""
}
// MapAttributeCheckpoint is a helper map of string to AttributeCheckpoint attribute value.
var MapAttributeCheckpoint = map[string]AttributeCheckpoint{
"completed": AttributeCheckpointCompleted,
"failed": AttributeCheckpointFailed,
}
// AttributeGarbageCollectorName specifies the value garbage_collector_name attribute.
type AttributeGarbageCollectorName int
const (
_ AttributeGarbageCollectorName = iota
AttributeGarbageCollectorNamePSMarkSweep
AttributeGarbageCollectorNamePSScavenge
AttributeGarbageCollectorNameG1YoungGeneration
AttributeGarbageCollectorNameG1OldGeneration
)
// String returns the string representation of the AttributeGarbageCollectorName.
func (av AttributeGarbageCollectorName) String() string {
switch av {
case AttributeGarbageCollectorNamePSMarkSweep:
return "PS_MarkSweep"
case AttributeGarbageCollectorNamePSScavenge:
return "PS_Scavenge"
case AttributeGarbageCollectorNameG1YoungGeneration:
return "G1_Young_Generation"
case AttributeGarbageCollectorNameG1OldGeneration:
return "G1_Old_Generation"
}
return ""
}
// MapAttributeGarbageCollectorName is a helper map of string to AttributeGarbageCollectorName attribute value.
var MapAttributeGarbageCollectorName = map[string]AttributeGarbageCollectorName{
"PS_MarkSweep": AttributeGarbageCollectorNamePSMarkSweep,
"PS_Scavenge": AttributeGarbageCollectorNamePSScavenge,
"G1_Young_Generation": AttributeGarbageCollectorNameG1YoungGeneration,
"G1_Old_Generation": AttributeGarbageCollectorNameG1OldGeneration,
}
// AttributeRecord specifies the value record attribute.
type AttributeRecord int
const (
_ AttributeRecord = iota
AttributeRecordIn
AttributeRecordOut
AttributeRecordDropped
)
// String returns the string representation of the AttributeRecord.
func (av AttributeRecord) String() string {
switch av {
case AttributeRecordIn:
return "in"
case AttributeRecordOut:
return "out"
case AttributeRecordDropped:
return "dropped"
}
return ""
}
// MapAttributeRecord is a helper map of string to AttributeRecord attribute value.
var MapAttributeRecord = map[string]AttributeRecord{
"in": AttributeRecordIn,
"out": AttributeRecordOut,
"dropped": AttributeRecordDropped,
}
var MetricsInfo = metricsInfo{
FlinkJobCheckpointCount: metricInfo{
Name: "flink.job.checkpoint.count",
},
FlinkJobCheckpointInProgress: metricInfo{
Name: "flink.job.checkpoint.in_progress",
},
FlinkJobLastCheckpointSize: metricInfo{
Name: "flink.job.last_checkpoint.size",
},
FlinkJobLastCheckpointTime: metricInfo{
Name: "flink.job.last_checkpoint.time",
},
FlinkJobRestartCount: metricInfo{
Name: "flink.job.restart.count",
},
FlinkJvmClassLoaderClassesLoaded: metricInfo{
Name: "flink.jvm.class_loader.classes_loaded",
},
FlinkJvmCPULoad: metricInfo{
Name: "flink.jvm.cpu.load",
},
FlinkJvmCPUTime: metricInfo{
Name: "flink.jvm.cpu.time",
},
FlinkJvmGcCollectionsCount: metricInfo{
Name: "flink.jvm.gc.collections.count",
},
FlinkJvmGcCollectionsTime: metricInfo{
Name: "flink.jvm.gc.collections.time",
},
FlinkJvmMemoryDirectTotalCapacity: metricInfo{
Name: "flink.jvm.memory.direct.total_capacity",
},
FlinkJvmMemoryDirectUsed: metricInfo{
Name: "flink.jvm.memory.direct.used",
},
FlinkJvmMemoryHeapCommitted: metricInfo{
Name: "flink.jvm.memory.heap.committed",
},
FlinkJvmMemoryHeapMax: metricInfo{
Name: "flink.jvm.memory.heap.max",
},
FlinkJvmMemoryHeapUsed: metricInfo{
Name: "flink.jvm.memory.heap.used",
},
FlinkJvmMemoryMappedTotalCapacity: metricInfo{
Name: "flink.jvm.memory.mapped.total_capacity",
},
FlinkJvmMemoryMappedUsed: metricInfo{
Name: "flink.jvm.memory.mapped.used",
},
FlinkJvmMemoryMetaspaceCommitted: metricInfo{
Name: "flink.jvm.memory.metaspace.committed",
},
FlinkJvmMemoryMetaspaceMax: metricInfo{
Name: "flink.jvm.memory.metaspace.max",
},
FlinkJvmMemoryMetaspaceUsed: metricInfo{
Name: "flink.jvm.memory.metaspace.used",
},
FlinkJvmMemoryNonheapCommitted: metricInfo{
Name: "flink.jvm.memory.nonheap.committed",
},
FlinkJvmMemoryNonheapMax: metricInfo{
Name: "flink.jvm.memory.nonheap.max",
},
FlinkJvmMemoryNonheapUsed: metricInfo{
Name: "flink.jvm.memory.nonheap.used",
},
FlinkJvmThreadsCount: metricInfo{
Name: "flink.jvm.threads.count",
},
FlinkMemoryManagedTotal: metricInfo{
Name: "flink.memory.managed.total",
},
FlinkMemoryManagedUsed: metricInfo{
Name: "flink.memory.managed.used",
},
FlinkOperatorRecordCount: metricInfo{
Name: "flink.operator.record.count",
},
FlinkOperatorWatermarkOutput: metricInfo{
Name: "flink.operator.watermark.output",
},
FlinkTaskRecordCount: metricInfo{
Name: "flink.task.record.count",
},
}
type metricsInfo struct {
FlinkJobCheckpointCount metricInfo
FlinkJobCheckpointInProgress metricInfo
FlinkJobLastCheckpointSize metricInfo
FlinkJobLastCheckpointTime metricInfo
FlinkJobRestartCount metricInfo
FlinkJvmClassLoaderClassesLoaded metricInfo
FlinkJvmCPULoad metricInfo
FlinkJvmCPUTime metricInfo
FlinkJvmGcCollectionsCount metricInfo
FlinkJvmGcCollectionsTime metricInfo
FlinkJvmMemoryDirectTotalCapacity metricInfo
FlinkJvmMemoryDirectUsed metricInfo
FlinkJvmMemoryHeapCommitted metricInfo
FlinkJvmMemoryHeapMax metricInfo
FlinkJvmMemoryHeapUsed metricInfo
FlinkJvmMemoryMappedTotalCapacity metricInfo
FlinkJvmMemoryMappedUsed metricInfo
FlinkJvmMemoryMetaspaceCommitted metricInfo
FlinkJvmMemoryMetaspaceMax metricInfo
FlinkJvmMemoryMetaspaceUsed metricInfo
FlinkJvmMemoryNonheapCommitted metricInfo
FlinkJvmMemoryNonheapMax metricInfo
FlinkJvmMemoryNonheapUsed metricInfo
FlinkJvmThreadsCount metricInfo
FlinkMemoryManagedTotal metricInfo
FlinkMemoryManagedUsed metricInfo
FlinkOperatorRecordCount metricInfo
FlinkOperatorWatermarkOutput metricInfo
FlinkTaskRecordCount metricInfo
}
type metricInfo struct {
Name string
}
type metricFlinkJobCheckpointCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.job.checkpoint.count metric with initial data.
func (m *metricFlinkJobCheckpointCount) init() {
m.data.SetName("flink.job.checkpoint.count")
m.data.SetDescription("The number of checkpoints completed or failed.")
m.data.SetUnit("{checkpoints}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricFlinkJobCheckpointCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, checkpointAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("checkpoint", checkpointAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJobCheckpointCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJobCheckpointCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJobCheckpointCount(cfg MetricConfig) metricFlinkJobCheckpointCount {
m := metricFlinkJobCheckpointCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJobCheckpointInProgress struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.job.checkpoint.in_progress metric with initial data.
func (m *metricFlinkJobCheckpointInProgress) init() {
m.data.SetName("flink.job.checkpoint.in_progress")
m.data.SetDescription("The number of checkpoints in progress.")
m.data.SetUnit("{checkpoints}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJobCheckpointInProgress) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJobCheckpointInProgress) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJobCheckpointInProgress) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJobCheckpointInProgress(cfg MetricConfig) metricFlinkJobCheckpointInProgress {
m := metricFlinkJobCheckpointInProgress{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJobLastCheckpointSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.job.last_checkpoint.size metric with initial data.
func (m *metricFlinkJobLastCheckpointSize) init() {
m.data.SetName("flink.job.last_checkpoint.size")
m.data.SetDescription("The total size of the last checkpoint.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJobLastCheckpointSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJobLastCheckpointSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJobLastCheckpointSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJobLastCheckpointSize(cfg MetricConfig) metricFlinkJobLastCheckpointSize {
m := metricFlinkJobLastCheckpointSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJobLastCheckpointTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.job.last_checkpoint.time metric with initial data.
func (m *metricFlinkJobLastCheckpointTime) init() {
m.data.SetName("flink.job.last_checkpoint.time")
m.data.SetDescription("The end to end duration of the last checkpoint.")
m.data.SetUnit("ms")
m.data.SetEmptyGauge()
}
func (m *metricFlinkJobLastCheckpointTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJobLastCheckpointTime) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJobLastCheckpointTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJobLastCheckpointTime(cfg MetricConfig) metricFlinkJobLastCheckpointTime {
m := metricFlinkJobLastCheckpointTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJobRestartCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.job.restart.count metric with initial data.
func (m *metricFlinkJobRestartCount) init() {
m.data.SetName("flink.job.restart.count")
m.data.SetDescription("The total number of restarts since this job was submitted, including full restarts and fine-grained restarts.")
m.data.SetUnit("{restarts}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJobRestartCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJobRestartCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJobRestartCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJobRestartCount(cfg MetricConfig) metricFlinkJobRestartCount {
m := metricFlinkJobRestartCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmClassLoaderClassesLoaded struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.class_loader.classes_loaded metric with initial data.
func (m *metricFlinkJvmClassLoaderClassesLoaded) init() {
m.data.SetName("flink.jvm.class_loader.classes_loaded")
m.data.SetDescription("The total number of classes loaded since the start of the JVM.")
m.data.SetUnit("{classes}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmClassLoaderClassesLoaded) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmClassLoaderClassesLoaded) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmClassLoaderClassesLoaded) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmClassLoaderClassesLoaded(cfg MetricConfig) metricFlinkJvmClassLoaderClassesLoaded {
m := metricFlinkJvmClassLoaderClassesLoaded{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmCPULoad struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.cpu.load metric with initial data.
func (m *metricFlinkJvmCPULoad) init() {
m.data.SetName("flink.jvm.cpu.load")
m.data.SetDescription("The CPU usage of the JVM for a jobmanager or taskmanager.")
m.data.SetUnit("%")
m.data.SetEmptyGauge()
}
func (m *metricFlinkJvmCPULoad) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetDoubleValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmCPULoad) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmCPULoad) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmCPULoad(cfg MetricConfig) metricFlinkJvmCPULoad {
m := metricFlinkJvmCPULoad{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmCPUTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.cpu.time metric with initial data.
func (m *metricFlinkJvmCPUTime) init() {
m.data.SetName("flink.jvm.cpu.time")
m.data.SetDescription("The CPU time used by the JVM for a jobmanager or taskmanager.")
m.data.SetUnit("ns")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmCPUTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmCPUTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmCPUTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmCPUTime(cfg MetricConfig) metricFlinkJvmCPUTime {
m := metricFlinkJvmCPUTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmGcCollectionsCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.gc.collections.count metric with initial data.
func (m *metricFlinkJvmGcCollectionsCount) init() {
m.data.SetName("flink.jvm.gc.collections.count")
m.data.SetDescription("The total number of collections that have occurred.")
m.data.SetUnit("{collections}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricFlinkJvmGcCollectionsCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, garbageCollectorNameAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("name", garbageCollectorNameAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmGcCollectionsCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmGcCollectionsCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmGcCollectionsCount(cfg MetricConfig) metricFlinkJvmGcCollectionsCount {
m := metricFlinkJvmGcCollectionsCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmGcCollectionsTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.gc.collections.time metric with initial data.
func (m *metricFlinkJvmGcCollectionsTime) init() {
m.data.SetName("flink.jvm.gc.collections.time")
m.data.SetDescription("The total time spent performing garbage collection.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricFlinkJvmGcCollectionsTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, garbageCollectorNameAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("name", garbageCollectorNameAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmGcCollectionsTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmGcCollectionsTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmGcCollectionsTime(cfg MetricConfig) metricFlinkJvmGcCollectionsTime {
m := metricFlinkJvmGcCollectionsTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryDirectTotalCapacity struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.direct.total_capacity metric with initial data.
func (m *metricFlinkJvmMemoryDirectTotalCapacity) init() {
m.data.SetName("flink.jvm.memory.direct.total_capacity")
m.data.SetDescription("The total capacity of all buffers in the direct buffer pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryDirectTotalCapacity) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryDirectTotalCapacity) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryDirectTotalCapacity) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryDirectTotalCapacity(cfg MetricConfig) metricFlinkJvmMemoryDirectTotalCapacity {
m := metricFlinkJvmMemoryDirectTotalCapacity{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryDirectUsed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.direct.used metric with initial data.
func (m *metricFlinkJvmMemoryDirectUsed) init() {
m.data.SetName("flink.jvm.memory.direct.used")
m.data.SetDescription("The amount of memory used by the JVM for the direct buffer pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryDirectUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryDirectUsed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryDirectUsed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryDirectUsed(cfg MetricConfig) metricFlinkJvmMemoryDirectUsed {
m := metricFlinkJvmMemoryDirectUsed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryHeapCommitted struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.heap.committed metric with initial data.
func (m *metricFlinkJvmMemoryHeapCommitted) init() {
m.data.SetName("flink.jvm.memory.heap.committed")
m.data.SetDescription("The amount of heap memory guaranteed to be available to the JVM.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryHeapCommitted) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryHeapCommitted) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryHeapCommitted) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryHeapCommitted(cfg MetricConfig) metricFlinkJvmMemoryHeapCommitted {
m := metricFlinkJvmMemoryHeapCommitted{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryHeapMax struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.heap.max metric with initial data.
func (m *metricFlinkJvmMemoryHeapMax) init() {
m.data.SetName("flink.jvm.memory.heap.max")
m.data.SetDescription("The maximum amount of heap memory that can be used for memory management.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryHeapMax) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryHeapMax) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryHeapMax) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryHeapMax(cfg MetricConfig) metricFlinkJvmMemoryHeapMax {
m := metricFlinkJvmMemoryHeapMax{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryHeapUsed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.heap.used metric with initial data.
func (m *metricFlinkJvmMemoryHeapUsed) init() {
m.data.SetName("flink.jvm.memory.heap.used")
m.data.SetDescription("The amount of heap memory currently used.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryHeapUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryHeapUsed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryHeapUsed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryHeapUsed(cfg MetricConfig) metricFlinkJvmMemoryHeapUsed {
m := metricFlinkJvmMemoryHeapUsed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryMappedTotalCapacity struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.mapped.total_capacity metric with initial data.
func (m *metricFlinkJvmMemoryMappedTotalCapacity) init() {
m.data.SetName("flink.jvm.memory.mapped.total_capacity")
m.data.SetDescription("The number of buffers in the mapped buffer pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryMappedTotalCapacity) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryMappedTotalCapacity) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryMappedTotalCapacity) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryMappedTotalCapacity(cfg MetricConfig) metricFlinkJvmMemoryMappedTotalCapacity {
m := metricFlinkJvmMemoryMappedTotalCapacity{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryMappedUsed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.mapped.used metric with initial data.
func (m *metricFlinkJvmMemoryMappedUsed) init() {
m.data.SetName("flink.jvm.memory.mapped.used")
m.data.SetDescription("The amount of memory used by the JVM for the mapped buffer pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryMappedUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryMappedUsed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryMappedUsed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryMappedUsed(cfg MetricConfig) metricFlinkJvmMemoryMappedUsed {
m := metricFlinkJvmMemoryMappedUsed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryMetaspaceCommitted struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.metaspace.committed metric with initial data.
func (m *metricFlinkJvmMemoryMetaspaceCommitted) init() {
m.data.SetName("flink.jvm.memory.metaspace.committed")
m.data.SetDescription("The amount of memory guaranteed to be available to the JVM in the Metaspace memory pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryMetaspaceCommitted) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryMetaspaceCommitted) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryMetaspaceCommitted) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryMetaspaceCommitted(cfg MetricConfig) metricFlinkJvmMemoryMetaspaceCommitted {
m := metricFlinkJvmMemoryMetaspaceCommitted{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryMetaspaceMax struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.metaspace.max metric with initial data.
func (m *metricFlinkJvmMemoryMetaspaceMax) init() {
m.data.SetName("flink.jvm.memory.metaspace.max")
m.data.SetDescription("The maximum amount of memory that can be used in the Metaspace memory pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryMetaspaceMax) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryMetaspaceMax) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryMetaspaceMax) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryMetaspaceMax(cfg MetricConfig) metricFlinkJvmMemoryMetaspaceMax {
m := metricFlinkJvmMemoryMetaspaceMax{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryMetaspaceUsed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.metaspace.used metric with initial data.
func (m *metricFlinkJvmMemoryMetaspaceUsed) init() {
m.data.SetName("flink.jvm.memory.metaspace.used")
m.data.SetDescription("The amount of memory currently used in the Metaspace memory pool.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryMetaspaceUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryMetaspaceUsed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryMetaspaceUsed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryMetaspaceUsed(cfg MetricConfig) metricFlinkJvmMemoryMetaspaceUsed {
m := metricFlinkJvmMemoryMetaspaceUsed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryNonheapCommitted struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.nonheap.committed metric with initial data.
func (m *metricFlinkJvmMemoryNonheapCommitted) init() {
m.data.SetName("flink.jvm.memory.nonheap.committed")
m.data.SetDescription("The amount of non-heap memory guaranteed to be available to the JVM.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryNonheapCommitted) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryNonheapCommitted) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryNonheapCommitted) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryNonheapCommitted(cfg MetricConfig) metricFlinkJvmMemoryNonheapCommitted {
m := metricFlinkJvmMemoryNonheapCommitted{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryNonheapMax struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.nonheap.max metric with initial data.
func (m *metricFlinkJvmMemoryNonheapMax) init() {
m.data.SetName("flink.jvm.memory.nonheap.max")
m.data.SetDescription("The maximum amount of non-heap memory that can be used for memory management.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryNonheapMax) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryNonheapMax) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryNonheapMax) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryNonheapMax(cfg MetricConfig) metricFlinkJvmMemoryNonheapMax {
m := metricFlinkJvmMemoryNonheapMax{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmMemoryNonheapUsed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.memory.nonheap.used metric with initial data.
func (m *metricFlinkJvmMemoryNonheapUsed) init() {
m.data.SetName("flink.jvm.memory.nonheap.used")
m.data.SetDescription("The amount of non-heap memory currently used.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmMemoryNonheapUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmMemoryNonheapUsed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmMemoryNonheapUsed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmMemoryNonheapUsed(cfg MetricConfig) metricFlinkJvmMemoryNonheapUsed {
m := metricFlinkJvmMemoryNonheapUsed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkJvmThreadsCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.jvm.threads.count metric with initial data.
func (m *metricFlinkJvmThreadsCount) init() {
m.data.SetName("flink.jvm.threads.count")
m.data.SetDescription("The total number of live threads.")
m.data.SetUnit("{threads}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkJvmThreadsCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkJvmThreadsCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkJvmThreadsCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkJvmThreadsCount(cfg MetricConfig) metricFlinkJvmThreadsCount {
m := metricFlinkJvmThreadsCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkMemoryManagedTotal struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.memory.managed.total metric with initial data.
func (m *metricFlinkMemoryManagedTotal) init() {
m.data.SetName("flink.memory.managed.total")
m.data.SetDescription("The total amount of managed memory.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkMemoryManagedTotal) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkMemoryManagedTotal) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkMemoryManagedTotal) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkMemoryManagedTotal(cfg MetricConfig) metricFlinkMemoryManagedTotal {
m := metricFlinkMemoryManagedTotal{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkMemoryManagedUsed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.memory.managed.used metric with initial data.
func (m *metricFlinkMemoryManagedUsed) init() {
m.data.SetName("flink.memory.managed.used")
m.data.SetDescription("The amount of managed memory currently used.")
m.data.SetUnit("By")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricFlinkMemoryManagedUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkMemoryManagedUsed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkMemoryManagedUsed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkMemoryManagedUsed(cfg MetricConfig) metricFlinkMemoryManagedUsed {
m := metricFlinkMemoryManagedUsed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkOperatorRecordCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.operator.record.count metric with initial data.
func (m *metricFlinkOperatorRecordCount) init() {
m.data.SetName("flink.operator.record.count")
m.data.SetDescription("The number of records an operator has.")
m.data.SetUnit("{records}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricFlinkOperatorRecordCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, operatorNameAttributeValue string, recordAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("name", operatorNameAttributeValue)
dp.Attributes().PutStr("record", recordAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkOperatorRecordCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkOperatorRecordCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkOperatorRecordCount(cfg MetricConfig) metricFlinkOperatorRecordCount {
m := metricFlinkOperatorRecordCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkOperatorWatermarkOutput struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.operator.watermark.output metric with initial data.
func (m *metricFlinkOperatorWatermarkOutput) init() {
m.data.SetName("flink.operator.watermark.output")
m.data.SetDescription("The last watermark this operator has emitted.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricFlinkOperatorWatermarkOutput) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, operatorNameAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("name", operatorNameAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkOperatorWatermarkOutput) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkOperatorWatermarkOutput) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkOperatorWatermarkOutput(cfg MetricConfig) metricFlinkOperatorWatermarkOutput {
m := metricFlinkOperatorWatermarkOutput{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricFlinkTaskRecordCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills flink.task.record.count metric with initial data.
func (m *metricFlinkTaskRecordCount) init() {
m.data.SetName("flink.task.record.count")
m.data.SetDescription("The number of records a task has.")
m.data.SetUnit("{records}")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricFlinkTaskRecordCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, recordAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("record", recordAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricFlinkTaskRecordCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricFlinkTaskRecordCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricFlinkTaskRecordCount(cfg MetricConfig) metricFlinkTaskRecordCount {
m := metricFlinkTaskRecordCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations
// required to produce metric representation defined in metadata and user config.
type MetricsBuilder struct {
config MetricsBuilderConfig // config of the metrics builder.
startTime pcommon.Timestamp // start time that will be applied to all recorded data points.
metricsCapacity int // maximum observed number of metrics per resource.
metricsBuffer pmetric.Metrics // accumulates metrics data before emitting.
buildInfo component.BuildInfo // contains version information.
resourceAttributeIncludeFilter map[string]filter.Filter
resourceAttributeExcludeFilter map[string]filter.Filter
metricFlinkJobCheckpointCount metricFlinkJobCheckpointCount
metricFlinkJobCheckpointInProgress metricFlinkJobCheckpointInProgress
metricFlinkJobLastCheckpointSize metricFlinkJobLastCheckpointSize
metricFlinkJobLastCheckpointTime metricFlinkJobLastCheckpointTime
metricFlinkJobRestartCount metricFlinkJobRestartCount
metricFlinkJvmClassLoaderClassesLoaded metricFlinkJvmClassLoaderClassesLoaded
metricFlinkJvmCPULoad metricFlinkJvmCPULoad
metricFlinkJvmCPUTime metricFlinkJvmCPUTime
metricFlinkJvmGcCollectionsCount metricFlinkJvmGcCollectionsCount
metricFlinkJvmGcCollectionsTime metricFlinkJvmGcCollectionsTime
metricFlinkJvmMemoryDirectTotalCapacity metricFlinkJvmMemoryDirectTotalCapacity
metricFlinkJvmMemoryDirectUsed metricFlinkJvmMemoryDirectUsed
metricFlinkJvmMemoryHeapCommitted metricFlinkJvmMemoryHeapCommitted
metricFlinkJvmMemoryHeapMax metricFlinkJvmMemoryHeapMax
metricFlinkJvmMemoryHeapUsed metricFlinkJvmMemoryHeapUsed
metricFlinkJvmMemoryMappedTotalCapacity metricFlinkJvmMemoryMappedTotalCapacity
metricFlinkJvmMemoryMappedUsed metricFlinkJvmMemoryMappedUsed
metricFlinkJvmMemoryMetaspaceCommitted metricFlinkJvmMemoryMetaspaceCommitted
metricFlinkJvmMemoryMetaspaceMax metricFlinkJvmMemoryMetaspaceMax
metricFlinkJvmMemoryMetaspaceUsed metricFlinkJvmMemoryMetaspaceUsed
metricFlinkJvmMemoryNonheapCommitted metricFlinkJvmMemoryNonheapCommitted
metricFlinkJvmMemoryNonheapMax metricFlinkJvmMemoryNonheapMax
metricFlinkJvmMemoryNonheapUsed metricFlinkJvmMemoryNonheapUsed
metricFlinkJvmThreadsCount metricFlinkJvmThreadsCount
metricFlinkMemoryManagedTotal metricFlinkMemoryManagedTotal
metricFlinkMemoryManagedUsed metricFlinkMemoryManagedUsed
metricFlinkOperatorRecordCount metricFlinkOperatorRecordCount
metricFlinkOperatorWatermarkOutput metricFlinkOperatorWatermarkOutput
metricFlinkTaskRecordCount metricFlinkTaskRecordCount
}
// MetricBuilderOption applies changes to default metrics builder.
type MetricBuilderOption interface {
apply(*MetricsBuilder)
}
type metricBuilderOptionFunc func(mb *MetricsBuilder)
func (mbof metricBuilderOptionFunc) apply(mb *MetricsBuilder) {
mbof(mb)
}
// WithStartTime sets startTime on the metrics builder.
func WithStartTime(startTime pcommon.Timestamp) MetricBuilderOption {
return metricBuilderOptionFunc(func(mb *MetricsBuilder) {
mb.startTime = startTime
})
}
func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, options ...MetricBuilderOption) *MetricsBuilder {
mb := &MetricsBuilder{
config: mbc,
startTime: pcommon.NewTimestampFromTime(time.Now()),
metricsBuffer: pmetric.NewMetrics(),
buildInfo: settings.BuildInfo,
metricFlinkJobCheckpointCount: newMetricFlinkJobCheckpointCount(mbc.Metrics.FlinkJobCheckpointCount),
metricFlinkJobCheckpointInProgress: newMetricFlinkJobCheckpointInProgress(mbc.Metrics.FlinkJobCheckpointInProgress),
metricFlinkJobLastCheckpointSize: newMetricFlinkJobLastCheckpointSize(mbc.Metrics.FlinkJobLastCheckpointSize),
metricFlinkJobLastCheckpointTime: newMetricFlinkJobLastCheckpointTime(mbc.Metrics.FlinkJobLastCheckpointTime),
metricFlinkJobRestartCount: newMetricFlinkJobRestartCount(mbc.Metrics.FlinkJobRestartCount),
metricFlinkJvmClassLoaderClassesLoaded: newMetricFlinkJvmClassLoaderClassesLoaded(mbc.Metrics.FlinkJvmClassLoaderClassesLoaded),
metricFlinkJvmCPULoad: newMetricFlinkJvmCPULoad(mbc.Metrics.FlinkJvmCPULoad),
metricFlinkJvmCPUTime: newMetricFlinkJvmCPUTime(mbc.Metrics.FlinkJvmCPUTime),
metricFlinkJvmGcCollectionsCount: newMetricFlinkJvmGcCollectionsCount(mbc.Metrics.FlinkJvmGcCollectionsCount),
metricFlinkJvmGcCollectionsTime: newMetricFlinkJvmGcCollectionsTime(mbc.Metrics.FlinkJvmGcCollectionsTime),
metricFlinkJvmMemoryDirectTotalCapacity: newMetricFlinkJvmMemoryDirectTotalCapacity(mbc.Metrics.FlinkJvmMemoryDirectTotalCapacity),
metricFlinkJvmMemoryDirectUsed: newMetricFlinkJvmMemoryDirectUsed(mbc.Metrics.FlinkJvmMemoryDirectUsed),
metricFlinkJvmMemoryHeapCommitted: newMetricFlinkJvmMemoryHeapCommitted(mbc.Metrics.FlinkJvmMemoryHeapCommitted),
metricFlinkJvmMemoryHeapMax: newMetricFlinkJvmMemoryHeapMax(mbc.Metrics.FlinkJvmMemoryHeapMax),
metricFlinkJvmMemoryHeapUsed: newMetricFlinkJvmMemoryHeapUsed(mbc.Metrics.FlinkJvmMemoryHeapUsed),
metricFlinkJvmMemoryMappedTotalCapacity: newMetricFlinkJvmMemoryMappedTotalCapacity(mbc.Metrics.FlinkJvmMemoryMappedTotalCapacity),
metricFlinkJvmMemoryMappedUsed: newMetricFlinkJvmMemoryMappedUsed(mbc.Metrics.FlinkJvmMemoryMappedUsed),
metricFlinkJvmMemoryMetaspaceCommitted: newMetricFlinkJvmMemoryMetaspaceCommitted(mbc.Metrics.FlinkJvmMemoryMetaspaceCommitted),
metricFlinkJvmMemoryMetaspaceMax: newMetricFlinkJvmMemoryMetaspaceMax(mbc.Metrics.FlinkJvmMemoryMetaspaceMax),
metricFlinkJvmMemoryMetaspaceUsed: newMetricFlinkJvmMemoryMetaspaceUsed(mbc.Metrics.FlinkJvmMemoryMetaspaceUsed),
metricFlinkJvmMemoryNonheapCommitted: newMetricFlinkJvmMemoryNonheapCommitted(mbc.Metrics.FlinkJvmMemoryNonheapCommitted),
metricFlinkJvmMemoryNonheapMax: newMetricFlinkJvmMemoryNonheapMax(mbc.Metrics.FlinkJvmMemoryNonheapMax),
metricFlinkJvmMemoryNonheapUsed: newMetricFlinkJvmMemoryNonheapUsed(mbc.Metrics.FlinkJvmMemoryNonheapUsed),
metricFlinkJvmThreadsCount: newMetricFlinkJvmThreadsCount(mbc.Metrics.FlinkJvmThreadsCount),
metricFlinkMemoryManagedTotal: newMetricFlinkMemoryManagedTotal(mbc.Metrics.FlinkMemoryManagedTotal),
metricFlinkMemoryManagedUsed: newMetricFlinkMemoryManagedUsed(mbc.Metrics.FlinkMemoryManagedUsed),
metricFlinkOperatorRecordCount: newMetricFlinkOperatorRecordCount(mbc.Metrics.FlinkOperatorRecordCount),
metricFlinkOperatorWatermarkOutput: newMetricFlinkOperatorWatermarkOutput(mbc.Metrics.FlinkOperatorWatermarkOutput),
metricFlinkTaskRecordCount: newMetricFlinkTaskRecordCount(mbc.Metrics.FlinkTaskRecordCount),
resourceAttributeIncludeFilter: make(map[string]filter.Filter),
resourceAttributeExcludeFilter: make(map[string]filter.Filter),
}
if mbc.ResourceAttributes.FlinkJobName.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["flink.job.name"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkJobName.MetricsInclude)
}
if mbc.ResourceAttributes.FlinkJobName.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["flink.job.name"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkJobName.MetricsExclude)
}
if mbc.ResourceAttributes.FlinkResourceType.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["flink.resource.type"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkResourceType.MetricsInclude)
}
if mbc.ResourceAttributes.FlinkResourceType.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["flink.resource.type"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkResourceType.MetricsExclude)
}
if mbc.ResourceAttributes.FlinkSubtaskIndex.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["flink.subtask.index"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkSubtaskIndex.MetricsInclude)
}
if mbc.ResourceAttributes.FlinkSubtaskIndex.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["flink.subtask.index"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkSubtaskIndex.MetricsExclude)
}
if mbc.ResourceAttributes.FlinkTaskName.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["flink.task.name"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkTaskName.MetricsInclude)
}
if mbc.ResourceAttributes.FlinkTaskName.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["flink.task.name"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkTaskName.MetricsExclude)
}
if mbc.ResourceAttributes.FlinkTaskmanagerID.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["flink.taskmanager.id"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkTaskmanagerID.MetricsInclude)
}
if mbc.ResourceAttributes.FlinkTaskmanagerID.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["flink.taskmanager.id"] = filter.CreateFilter(mbc.ResourceAttributes.FlinkTaskmanagerID.MetricsExclude)
}
if mbc.ResourceAttributes.HostName.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["host.name"] = filter.CreateFilter(mbc.ResourceAttributes.HostName.MetricsInclude)
}
if mbc.ResourceAttributes.HostName.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["host.name"] = filter.CreateFilter(mbc.ResourceAttributes.HostName.MetricsExclude)
}
for _, op := range options {
op.apply(mb)
}
return mb
}
// NewResourceBuilder returns a new resource builder that should be used to build a resource associated with for the emitted metrics.
func (mb *MetricsBuilder) NewResourceBuilder() *ResourceBuilder {
return NewResourceBuilder(mb.config.ResourceAttributes)
}
// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity.
func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) {
if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() {
mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len()
}
}
// ResourceMetricsOption applies changes to provided resource metrics.
type ResourceMetricsOption interface {
apply(pmetric.ResourceMetrics)
}
type resourceMetricsOptionFunc func(pmetric.ResourceMetrics)
func (rmof resourceMetricsOptionFunc) apply(rm pmetric.ResourceMetrics) {
rmof(rm)
}
// WithResource sets the provided resource on the emitted ResourceMetrics.
// It's recommended to use ResourceBuilder to create the resource.
func WithResource(res pcommon.Resource) ResourceMetricsOption {
return resourceMetricsOptionFunc(func(rm pmetric.ResourceMetrics) {
res.CopyTo(rm.Resource())
})
}
// WithStartTimeOverride overrides start time for all the resource metrics data points.
// This option should be only used if different start time has to be set on metrics coming from different resources.
func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption {
return resourceMetricsOptionFunc(func(rm pmetric.ResourceMetrics) {
var dps pmetric.NumberDataPointSlice
metrics := rm.ScopeMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
switch metrics.At(i).Type() {
case pmetric.MetricTypeGauge:
dps = metrics.At(i).Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = metrics.At(i).Sum().DataPoints()
}
for j := 0; j < dps.Len(); j++ {
dps.At(j).SetStartTimestamp(start)
}
}
})
}
// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for
// recording another set of data points as part of another resource. This function can be helpful when one scraper
// needs to emit metrics from several resources. Otherwise calling this function is not required,
// just `Emit` function can be called instead.
// Resource attributes should be provided as ResourceMetricsOption arguments.
func (mb *MetricsBuilder) EmitForResource(options ...ResourceMetricsOption) {
rm := pmetric.NewResourceMetrics()
ils := rm.ScopeMetrics().AppendEmpty()
ils.Scope().SetName(ScopeName)
ils.Scope().SetVersion(mb.buildInfo.Version)
ils.Metrics().EnsureCapacity(mb.metricsCapacity)
mb.metricFlinkJobCheckpointCount.emit(ils.Metrics())
mb.metricFlinkJobCheckpointInProgress.emit(ils.Metrics())
mb.metricFlinkJobLastCheckpointSize.emit(ils.Metrics())
mb.metricFlinkJobLastCheckpointTime.emit(ils.Metrics())
mb.metricFlinkJobRestartCount.emit(ils.Metrics())
mb.metricFlinkJvmClassLoaderClassesLoaded.emit(ils.Metrics())
mb.metricFlinkJvmCPULoad.emit(ils.Metrics())
mb.metricFlinkJvmCPUTime.emit(ils.Metrics())
mb.metricFlinkJvmGcCollectionsCount.emit(ils.Metrics())
mb.metricFlinkJvmGcCollectionsTime.emit(ils.Metrics())
mb.metricFlinkJvmMemoryDirectTotalCapacity.emit(ils.Metrics())
mb.metricFlinkJvmMemoryDirectUsed.emit(ils.Metrics())
mb.metricFlinkJvmMemoryHeapCommitted.emit(ils.Metrics())
mb.metricFlinkJvmMemoryHeapMax.emit(ils.Metrics())
mb.metricFlinkJvmMemoryHeapUsed.emit(ils.Metrics())
mb.metricFlinkJvmMemoryMappedTotalCapacity.emit(ils.Metrics())
mb.metricFlinkJvmMemoryMappedUsed.emit(ils.Metrics())
mb.metricFlinkJvmMemoryMetaspaceCommitted.emit(ils.Metrics())
mb.metricFlinkJvmMemoryMetaspaceMax.emit(ils.Metrics())
mb.metricFlinkJvmMemoryMetaspaceUsed.emit(ils.Metrics())
mb.metricFlinkJvmMemoryNonheapCommitted.emit(ils.Metrics())
mb.metricFlinkJvmMemoryNonheapMax.emit(ils.Metrics())
mb.metricFlinkJvmMemoryNonheapUsed.emit(ils.Metrics())
mb.metricFlinkJvmThreadsCount.emit(ils.Metrics())
mb.metricFlinkMemoryManagedTotal.emit(ils.Metrics())
mb.metricFlinkMemoryManagedUsed.emit(ils.Metrics())
mb.metricFlinkOperatorRecordCount.emit(ils.Metrics())
mb.metricFlinkOperatorWatermarkOutput.emit(ils.Metrics())
mb.metricFlinkTaskRecordCount.emit(ils.Metrics())
for _, op := range options {
op.apply(rm)
}
for attr, filter := range mb.resourceAttributeIncludeFilter {
if val, ok := rm.Resource().Attributes().Get(attr); ok && !filter.Matches(val.AsString()) {
return
}
}
for attr, filter := range mb.resourceAttributeExcludeFilter {
if val, ok := rm.Resource().Attributes().Get(attr); ok && filter.Matches(val.AsString()) {
return
}
}
if ils.Metrics().Len() > 0 {
mb.updateCapacity(rm)
rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty())
}
}
// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for
// recording another set of metrics. This function will be responsible for applying all the transformations required to
// produce metric representation defined in metadata and user config, e.g. delta or cumulative.
func (mb *MetricsBuilder) Emit(options ...ResourceMetricsOption) pmetric.Metrics {
mb.EmitForResource(options...)
metrics := mb.metricsBuffer
mb.metricsBuffer = pmetric.NewMetrics()
return metrics
}
// RecordFlinkJobCheckpointCountDataPoint adds a data point to flink.job.checkpoint.count metric.
func (mb *MetricsBuilder) RecordFlinkJobCheckpointCountDataPoint(ts pcommon.Timestamp, inputVal string, checkpointAttributeValue AttributeCheckpoint) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJobCheckpointCount, value was %s: %w", inputVal, err)
}
mb.metricFlinkJobCheckpointCount.recordDataPoint(mb.startTime, ts, val, checkpointAttributeValue.String())
return nil
}
// RecordFlinkJobCheckpointInProgressDataPoint adds a data point to flink.job.checkpoint.in_progress metric.
func (mb *MetricsBuilder) RecordFlinkJobCheckpointInProgressDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJobCheckpointInProgress, value was %s: %w", inputVal, err)
}
mb.metricFlinkJobCheckpointInProgress.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJobLastCheckpointSizeDataPoint adds a data point to flink.job.last_checkpoint.size metric.
func (mb *MetricsBuilder) RecordFlinkJobLastCheckpointSizeDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJobLastCheckpointSize, value was %s: %w", inputVal, err)
}
mb.metricFlinkJobLastCheckpointSize.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJobLastCheckpointTimeDataPoint adds a data point to flink.job.last_checkpoint.time metric.
func (mb *MetricsBuilder) RecordFlinkJobLastCheckpointTimeDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJobLastCheckpointTime, value was %s: %w", inputVal, err)
}
mb.metricFlinkJobLastCheckpointTime.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJobRestartCountDataPoint adds a data point to flink.job.restart.count metric.
func (mb *MetricsBuilder) RecordFlinkJobRestartCountDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJobRestartCount, value was %s: %w", inputVal, err)
}
mb.metricFlinkJobRestartCount.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmClassLoaderClassesLoadedDataPoint adds a data point to flink.jvm.class_loader.classes_loaded metric.
func (mb *MetricsBuilder) RecordFlinkJvmClassLoaderClassesLoadedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmClassLoaderClassesLoaded, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmClassLoaderClassesLoaded.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmCPULoadDataPoint adds a data point to flink.jvm.cpu.load metric.
func (mb *MetricsBuilder) RecordFlinkJvmCPULoadDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseFloat(inputVal, 64)
if err != nil {
return fmt.Errorf("failed to parse float64 for FlinkJvmCPULoad, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmCPULoad.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmCPUTimeDataPoint adds a data point to flink.jvm.cpu.time metric.
func (mb *MetricsBuilder) RecordFlinkJvmCPUTimeDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmCPUTime, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmCPUTime.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmGcCollectionsCountDataPoint adds a data point to flink.jvm.gc.collections.count metric.
func (mb *MetricsBuilder) RecordFlinkJvmGcCollectionsCountDataPoint(ts pcommon.Timestamp, inputVal string, garbageCollectorNameAttributeValue AttributeGarbageCollectorName) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmGcCollectionsCount, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmGcCollectionsCount.recordDataPoint(mb.startTime, ts, val, garbageCollectorNameAttributeValue.String())
return nil
}
// RecordFlinkJvmGcCollectionsTimeDataPoint adds a data point to flink.jvm.gc.collections.time metric.
func (mb *MetricsBuilder) RecordFlinkJvmGcCollectionsTimeDataPoint(ts pcommon.Timestamp, inputVal string, garbageCollectorNameAttributeValue AttributeGarbageCollectorName) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmGcCollectionsTime, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmGcCollectionsTime.recordDataPoint(mb.startTime, ts, val, garbageCollectorNameAttributeValue.String())
return nil
}
// RecordFlinkJvmMemoryDirectTotalCapacityDataPoint adds a data point to flink.jvm.memory.direct.total_capacity metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryDirectTotalCapacity, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryDirectTotalCapacity.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryDirectUsedDataPoint adds a data point to flink.jvm.memory.direct.used metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryDirectUsedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryDirectUsed, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryDirectUsed.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryHeapCommittedDataPoint adds a data point to flink.jvm.memory.heap.committed metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryHeapCommittedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryHeapCommitted, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryHeapCommitted.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryHeapMaxDataPoint adds a data point to flink.jvm.memory.heap.max metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryHeapMaxDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryHeapMax, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryHeapMax.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryHeapUsedDataPoint adds a data point to flink.jvm.memory.heap.used metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryHeapUsedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryHeapUsed, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryHeapUsed.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryMappedTotalCapacityDataPoint adds a data point to flink.jvm.memory.mapped.total_capacity metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryMappedTotalCapacity, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryMappedTotalCapacity.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryMappedUsedDataPoint adds a data point to flink.jvm.memory.mapped.used metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryMappedUsedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryMappedUsed, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryMappedUsed.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryMetaspaceCommittedDataPoint adds a data point to flink.jvm.memory.metaspace.committed metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryMetaspaceCommitted, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryMetaspaceCommitted.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryMetaspaceMaxDataPoint adds a data point to flink.jvm.memory.metaspace.max metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryMetaspaceMaxDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryMetaspaceMax, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryMetaspaceMax.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryMetaspaceUsedDataPoint adds a data point to flink.jvm.memory.metaspace.used metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryMetaspaceUsedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryMetaspaceUsed, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryMetaspaceUsed.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryNonheapCommittedDataPoint adds a data point to flink.jvm.memory.nonheap.committed metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryNonheapCommittedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryNonheapCommitted, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryNonheapCommitted.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryNonheapMaxDataPoint adds a data point to flink.jvm.memory.nonheap.max metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryNonheapMaxDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryNonheapMax, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryNonheapMax.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmMemoryNonheapUsedDataPoint adds a data point to flink.jvm.memory.nonheap.used metric.
func (mb *MetricsBuilder) RecordFlinkJvmMemoryNonheapUsedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmMemoryNonheapUsed, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmMemoryNonheapUsed.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkJvmThreadsCountDataPoint adds a data point to flink.jvm.threads.count metric.
func (mb *MetricsBuilder) RecordFlinkJvmThreadsCountDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkJvmThreadsCount, value was %s: %w", inputVal, err)
}
mb.metricFlinkJvmThreadsCount.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkMemoryManagedTotalDataPoint adds a data point to flink.memory.managed.total metric.
func (mb *MetricsBuilder) RecordFlinkMemoryManagedTotalDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkMemoryManagedTotal, value was %s: %w", inputVal, err)
}
mb.metricFlinkMemoryManagedTotal.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkMemoryManagedUsedDataPoint adds a data point to flink.memory.managed.used metric.
func (mb *MetricsBuilder) RecordFlinkMemoryManagedUsedDataPoint(ts pcommon.Timestamp, inputVal string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkMemoryManagedUsed, value was %s: %w", inputVal, err)
}
mb.metricFlinkMemoryManagedUsed.recordDataPoint(mb.startTime, ts, val)
return nil
}
// RecordFlinkOperatorRecordCountDataPoint adds a data point to flink.operator.record.count metric.
func (mb *MetricsBuilder) RecordFlinkOperatorRecordCountDataPoint(ts pcommon.Timestamp, inputVal string, operatorNameAttributeValue string, recordAttributeValue AttributeRecord) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkOperatorRecordCount, value was %s: %w", inputVal, err)
}
mb.metricFlinkOperatorRecordCount.recordDataPoint(mb.startTime, ts, val, operatorNameAttributeValue, recordAttributeValue.String())
return nil
}
// RecordFlinkOperatorWatermarkOutputDataPoint adds a data point to flink.operator.watermark.output metric.
func (mb *MetricsBuilder) RecordFlinkOperatorWatermarkOutputDataPoint(ts pcommon.Timestamp, inputVal string, operatorNameAttributeValue string) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkOperatorWatermarkOutput, value was %s: %w", inputVal, err)
}
mb.metricFlinkOperatorWatermarkOutput.recordDataPoint(mb.startTime, ts, val, operatorNameAttributeValue)
return nil
}
// RecordFlinkTaskRecordCountDataPoint adds a data point to flink.task.record.count metric.
func (mb *MetricsBuilder) RecordFlinkTaskRecordCountDataPoint(ts pcommon.Timestamp, inputVal string, recordAttributeValue AttributeRecord) error {
val, err := strconv.ParseInt(inputVal, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse int64 for FlinkTaskRecordCount, value was %s: %w", inputVal, err)
}
mb.metricFlinkTaskRecordCount.recordDataPoint(mb.startTime, ts, val, recordAttributeValue.String())
return nil
}
// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted,
// and metrics builder should update its startTime and reset it's internal state accordingly.
func (mb *MetricsBuilder) Reset(options ...MetricBuilderOption) {
mb.startTime = pcommon.NewTimestampFromTime(time.Now())
for _, op := range options {
op.apply(mb)
}
}