in exporter/kineticaexporter/metrics_exporter.go [962:1146]
func (e *kineticaMetricsExporter) createSumRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, sumRecord pmetric.Sum, name, description, unit string) (*kineticaSumRecord, error) {
var errs []error
kiSumRecord := new(kineticaSumRecord)
var isMonotonic int8
if sumRecord.IsMonotonic() {
isMonotonic = 1
}
sum := &Sum{
SumID: uuid.New().String(),
MetricName: name,
Description: description,
Unit: unit,
AggregationTemporality: sumRecord.AggregationTemporality(),
IsMonotonic: isMonotonic,
}
kiSumRecord.sum = sum
// Handle data points
var sumDatapointAttribute []SumDataPointAttribute
sumDatapointAttributes := make(map[string]ValueTypePair)
var exemplarAttribute []SumDataPointExemplarAttribute
exemplarAttributes := make(map[string]ValueTypePair)
for i := 0; i < sumRecord.DataPoints().Len(); i++ {
datapoint := sumRecord.DataPoints().At(i)
sumDatapoint := SumDatapoint{
SumID: sum.SumID,
ID: uuid.New().String(),
StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(),
TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(),
SumValue: datapoint.DoubleValue(),
Flags: int(datapoint.Flags()),
}
kiSumRecord.datapoint = append(kiSumRecord.datapoint, sumDatapoint)
// Handle Sum attribute
for k, v := range datapoint.Attributes().All() {
if k == "" {
e.logger.Debug("Sum record attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
sumDatapointAttributes[k] = v
} else {
e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range sumDatapointAttributes {
vtPair := sumDatapointAttributes[key]
sa, err := e.newSumDatapointAttributeValue(sum.SumID, sumDatapoint.ID, key, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
sumDatapointAttribute = append(sumDatapointAttribute, *sa)
}
}
kiSumRecord.datapointAttribute = append(kiSumRecord.datapointAttribute, sumDatapointAttribute...)
for k := range sumDatapointAttributes {
delete(sumDatapointAttributes, k)
}
// Handle data point exemplars
exemplars := datapoint.Exemplars()
for i := 0; i < exemplars.Len(); i++ {
exemplar := exemplars.At(i)
sumDatapointExemplar := SumDatapointExemplar{
SumID: sum.SumID,
DatapointID: sumDatapoint.ID,
ExemplarID: uuid.New().String(),
TimeUnix: exemplar.Timestamp().AsTime().UnixMilli(),
SumValue: exemplar.DoubleValue(),
TraceID: exemplar.TraceID().String(),
SpanID: exemplar.SpanID().String(),
}
kiSumRecord.exemplars = append(kiSumRecord.exemplars, sumDatapointExemplar)
// Handle Exemplar attribute
for k, v := range exemplar.FilteredAttributes().All() {
if k == "" {
e.logger.Debug("Sum record attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
exemplarAttributes[k] = v
} else {
e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range exemplarAttributes {
vtPair := exemplarAttributes[key]
ea, err := e.newSumDatapointExemplarAttributeValue(sum.SumID, sumDatapoint.ID, sumDatapointExemplar.ExemplarID, key, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
exemplarAttribute = append(exemplarAttribute, *ea)
}
}
kiSumRecord.exemplarAttribute = append(kiSumRecord.exemplarAttribute, exemplarAttribute...)
for k := range exemplarAttributes {
delete(exemplarAttributes, k)
}
}
}
// Handle Resource attribute
var resourceAttribute []SumResourceAttribute
resourceAttributes := make(map[string]ValueTypePair)
if resAttr.Len() > 0 {
for k, v := range resAttr.All() {
if k == "" {
e.logger.Debug("Resource attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
resourceAttributes[k] = v
} else {
e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range resourceAttributes {
vtPair := resourceAttributes[key]
ga, err := e.newSumResourceAttributeValue(sum.SumID, key, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
resourceAttribute = append(resourceAttribute, *ga)
}
}
kiSumRecord.sumResourceAttribute = make([]SumResourceAttribute, len(resourceAttribute))
copy(kiSumRecord.sumResourceAttribute, resourceAttribute)
}
// Handle Scope attribute
var scopeAttribute []SumScopeAttribute
scopeAttributes := make(map[string]ValueTypePair)
scopeName := scopeInstr.Name()
scopeVersion := scopeInstr.Version()
if scopeInstr.Attributes().Len() > 0 {
for k, v := range scopeInstr.Attributes().All() {
if k == "" {
e.logger.Debug("Scope attribute key is empty")
} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
scopeAttributes[k] = v
} else {
e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error()))
errs = append(errs, err)
}
}
for key := range scopeAttributes {
vtPair := scopeAttributes[key]
sa, err := e.newSumScopeAttributeValue(sum.SumID, key, scopeName, scopeVersion, vtPair)
if err != nil {
e.logger.Error(err.Error())
} else {
scopeAttribute = append(scopeAttribute, *sa)
}
}
copy(kiSumRecord.sumScopeAttribute, scopeAttribute)
} else {
// No attributes found - just basic scope
kiSumRecord.sumScopeAttribute = append(kiSumRecord.sumScopeAttribute, SumScopeAttribute{
SumID: sum.SumID,
ScopeName: scopeName,
ScopeVersion: scopeVersion,
Key: "",
AttributeValue: AttributeValue{},
})
}
return kiSumRecord, multierr.Combine(errs...)
}