in exporter/kineticaexporter/metrics_exporter.go [535:733]
func (e *kineticaMetricsExporter) createExponentialHistogramRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, exponentialHistogramRecord pmetric.ExponentialHistogram, name, description, unit string) (*kineticaExponentialHistogramRecord, error) {
var errs []error
kiExpHistogramRecord := new(kineticaExponentialHistogramRecord)
histogram := &ExponentialHistogram{
HistogramID: uuid.New().String(),
MetricName: name,
Description: description,
Unit: unit,
AggregationTemporality: exponentialHistogramRecord.AggregationTemporality(),
}
kiExpHistogramRecord.histogram = histogram
// Handle data points
var datapointAttribute []ExponentialHistogramDataPointAttribute
datapointAttributes := make(map[string]ValueTypePair)
var exemplarAttribute []ExponentialHistogramDataPointExemplarAttribute
exemplarAttributes := make(map[string]ValueTypePair)
var datapointBucketPositiveCount []ExponentialHistogramBucketPositiveCount
var datapointBucketNegativeCount []ExponentialHistogramBucketNegativeCount
for i := 0; i < exponentialHistogramRecord.DataPoints().Len(); i++ {
datapoint := exponentialHistogramRecord.DataPoints().At(i)
expHistogramDatapoint := ExponentialHistogramDatapoint{
HistogramID: histogram.HistogramID,
ID: uuid.New().String(),
StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(),
TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(),
Count: int64(datapoint.Count()),
Sum: datapoint.Sum(),
Min: datapoint.Min(),
Max: datapoint.Max(),
Flags: int(datapoint.Flags()),
Scale: int(datapoint.Scale()),
ZeroCount: int64(datapoint.ZeroCount()),
BucketsPositiveOffset: int(datapoint.Positive().Offset()),
BucketsNegativeOffset: int(datapoint.Negative().Offset()),
}
kiExpHistogramRecord.histogramDatapoint = append(kiExpHistogramRecord.histogramDatapoint, expHistogramDatapoint)
// Handle histogram datapoint attribute
for k, v := range datapoint.Attributes().All() {
if k == "" {
e.logger.Debug("Sum record attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
datapointAttributes[k] = v
} else {
e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range datapointAttributes {
vtPair := datapointAttributes[key]
sa, err := e.newExponentialHistogramDatapointAttributeValue(histogram.HistogramID, expHistogramDatapoint.ID, key, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
datapointAttribute = append(datapointAttribute, *sa)
}
}
kiExpHistogramRecord.histogramDatapointAttribute = append(kiExpHistogramRecord.histogramDatapointAttribute, datapointAttribute...)
for k := range datapointAttributes {
delete(datapointAttributes, k)
}
// Handle datapoint exemplars
exemplars := datapoint.Exemplars()
for i := 0; i < exemplars.Len(); i++ {
exemplar := exemplars.At(i)
sumDatapointExemplar := ExponentialHistogramDatapointExemplar{
HistogramID: histogram.HistogramID,
DatapointID: expHistogramDatapoint.ID,
ExemplarID: uuid.New().String(),
TimeUnix: exemplar.Timestamp().AsTime().UnixMilli(),
HistogramValue: exemplar.DoubleValue(),
TraceID: exemplar.TraceID().String(),
SpanID: exemplar.SpanID().String(),
}
kiExpHistogramRecord.exemplars = append(kiExpHistogramRecord.exemplars, sumDatapointExemplar)
// Handle Exemplar attribute
for k, v := range exemplar.FilteredAttributes().All() {
if k == "" {
e.logger.Debug("Sum record attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
exemplarAttributes[k] = v
} else {
e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range exemplarAttributes {
vtPair := exemplarAttributes[key]
ea, err := e.newExponentialHistogramDatapointExemplarAttributeValue(expHistogramDatapoint.HistogramID, expHistogramDatapoint.ID, sumDatapointExemplar.ExemplarID, key, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
exemplarAttribute = append(exemplarAttribute, *ea)
}
}
kiExpHistogramRecord.exemplarAttribute = append(kiExpHistogramRecord.exemplarAttribute, exemplarAttribute...)
for k := range exemplarAttributes {
delete(exemplarAttributes, k)
}
}
// Handle positive and negative bucket counts
for i := 0; i < datapoint.Positive().BucketCounts().Len(); i++ {
positiveBucketCount := datapoint.Positive().BucketCounts().At(i)
datapointBucketPositiveCount = append(datapointBucketPositiveCount, ExponentialHistogramBucketPositiveCount{
HistogramID: expHistogramDatapoint.HistogramID,
DatapointID: expHistogramDatapoint.ID,
CountID: uuid.New().String(),
Count: int64(positiveBucketCount),
})
}
kiExpHistogramRecord.histogramBucketPositiveCount = append(kiExpHistogramRecord.histogramBucketPositiveCount, datapointBucketPositiveCount...)
for i := 0; i < datapoint.Negative().BucketCounts().Len(); i++ {
negativeBucketCount := datapoint.Negative().BucketCounts().At(i)
datapointBucketNegativeCount = append(datapointBucketNegativeCount, ExponentialHistogramBucketNegativeCount{
HistogramID: expHistogramDatapoint.HistogramID,
DatapointID: expHistogramDatapoint.ID,
CountID: uuid.New().String(),
Count: negativeBucketCount,
})
}
kiExpHistogramRecord.histogramBucketNegativeCount = append(kiExpHistogramRecord.histogramBucketNegativeCount, datapointBucketNegativeCount...)
}
// Handle Resource attribute
var resourceAttribute []ExponentialHistogramResourceAttribute
resourceAttributes := make(map[string]ValueTypePair)
for k, v := range resAttr.All() {
if k == "" {
e.logger.Debug("Resource attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
resourceAttributes[k] = v
} else {
e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range resourceAttributes {
vtPair := resourceAttributes[key]
ga, err := e.newExponentialHistogramResourceAttributeValue(histogram.HistogramID, key, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
resourceAttribute = append(resourceAttribute, *ga)
}
}
copy(kiExpHistogramRecord.histogramResourceAttribute, resourceAttribute)
// Handle Scope attribute
var scopeAttribute []ExponentialHistogramScopeAttribute
scopeAttributes := make(map[string]ValueTypePair)
scopeName := scopeInstr.Name()
scopeVersion := scopeInstr.Version()
for k, v := range scopeInstr.Attributes().All() {
if k == "" {
e.logger.Debug("Scope attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
scopeAttributes[k] = v
} else {
e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range scopeAttributes {
vtPair := scopeAttributes[key]
sa, err := e.newExponentialHistogramScopeAttributeValue(histogram.HistogramID, key, scopeName, scopeVersion, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
scopeAttribute = append(scopeAttribute, *sa)
}
}
copy(kiExpHistogramRecord.histogramScopeAttribute, scopeAttribute)
return kiExpHistogramRecord, multierr.Combine(errs...)
}