func()

in exporter/kineticaexporter/metrics_exporter.go [962:1146]


func (e *kineticaMetricsExporter) createSumRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, sumRecord pmetric.Sum, name, description, unit string) (*kineticaSumRecord, error) {
	var errs []error

	kiSumRecord := new(kineticaSumRecord)
	var isMonotonic int8
	if sumRecord.IsMonotonic() {
		isMonotonic = 1
	}

	sum := &Sum{
		SumID:                  uuid.New().String(),
		MetricName:             name,
		Description:            description,
		Unit:                   unit,
		AggregationTemporality: sumRecord.AggregationTemporality(),
		IsMonotonic:            isMonotonic,
	}

	kiSumRecord.sum = sum

	// Handle data points
	var sumDatapointAttribute []SumDataPointAttribute
	sumDatapointAttributes := make(map[string]ValueTypePair)

	var exemplarAttribute []SumDataPointExemplarAttribute
	exemplarAttributes := make(map[string]ValueTypePair)

	for i := 0; i < sumRecord.DataPoints().Len(); i++ {
		datapoint := sumRecord.DataPoints().At(i)

		sumDatapoint := SumDatapoint{
			SumID:         sum.SumID,
			ID:            uuid.New().String(),
			StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(),
			TimeUnix:      datapoint.Timestamp().AsTime().UnixMilli(),
			SumValue:      datapoint.DoubleValue(),
			Flags:         int(datapoint.Flags()),
		}
		kiSumRecord.datapoint = append(kiSumRecord.datapoint, sumDatapoint)

		// Handle Sum attribute
		for k, v := range datapoint.Attributes().All() {
			if k == "" {
				e.logger.Debug("Sum record attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				sumDatapointAttributes[k] = v
			} else {
				e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		for key := range sumDatapointAttributes {
			vtPair := sumDatapointAttributes[key]
			sa, err := e.newSumDatapointAttributeValue(sum.SumID, sumDatapoint.ID, key, vtPair)
			if err != nil {
				e.logger.Error(err.Error())
			} else {
				sumDatapointAttribute = append(sumDatapointAttribute, *sa)
			}
		}
		kiSumRecord.datapointAttribute = append(kiSumRecord.datapointAttribute, sumDatapointAttribute...)

		for k := range sumDatapointAttributes {
			delete(sumDatapointAttributes, k)
		}

		// Handle data point exemplars
		exemplars := datapoint.Exemplars()

		for i := 0; i < exemplars.Len(); i++ {
			exemplar := exemplars.At(i)
			sumDatapointExemplar := SumDatapointExemplar{
				SumID:       sum.SumID,
				DatapointID: sumDatapoint.ID,
				ExemplarID:  uuid.New().String(),
				TimeUnix:    exemplar.Timestamp().AsTime().UnixMilli(),
				SumValue:    exemplar.DoubleValue(),
				TraceID:     exemplar.TraceID().String(),
				SpanID:      exemplar.SpanID().String(),
			}
			kiSumRecord.exemplars = append(kiSumRecord.exemplars, sumDatapointExemplar)

			// Handle Exemplar attribute
			for k, v := range exemplar.FilteredAttributes().All() {
				if k == "" {
					e.logger.Debug("Sum record attribute key is empty")
				} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
					exemplarAttributes[k] = v
				} else {
					e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
					errs = append(errs, err)
				}
			}

			for key := range exemplarAttributes {
				vtPair := exemplarAttributes[key]
				ea, err := e.newSumDatapointExemplarAttributeValue(sum.SumID, sumDatapoint.ID, sumDatapointExemplar.ExemplarID, key, vtPair)
				if err != nil {
					e.logger.Error(err.Error())
				} else {
					exemplarAttribute = append(exemplarAttribute, *ea)
				}
			}

			kiSumRecord.exemplarAttribute = append(kiSumRecord.exemplarAttribute, exemplarAttribute...)

			for k := range exemplarAttributes {
				delete(exemplarAttributes, k)
			}
		}
	}

	// Handle Resource attribute
	var resourceAttribute []SumResourceAttribute
	resourceAttributes := make(map[string]ValueTypePair)

	if resAttr.Len() > 0 {
		for k, v := range resAttr.All() {
			if k == "" {
				e.logger.Debug("Resource attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				resourceAttributes[k] = v
			} else {
				e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		for key := range resourceAttributes {
			vtPair := resourceAttributes[key]
			ga, err := e.newSumResourceAttributeValue(sum.SumID, key, vtPair)
			if err != nil {
				e.logger.Error(err.Error())
			} else {
				resourceAttribute = append(resourceAttribute, *ga)
			}
		}

		kiSumRecord.sumResourceAttribute = make([]SumResourceAttribute, len(resourceAttribute))
		copy(kiSumRecord.sumResourceAttribute, resourceAttribute)
	}

	// Handle Scope attribute
	var scopeAttribute []SumScopeAttribute
	scopeAttributes := make(map[string]ValueTypePair)
	scopeName := scopeInstr.Name()
	scopeVersion := scopeInstr.Version()

	if scopeInstr.Attributes().Len() > 0 {
		for k, v := range scopeInstr.Attributes().All() {
			if k == "" {
				e.logger.Debug("Scope attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				scopeAttributes[k] = v
			} else {
				e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		for key := range scopeAttributes {
			vtPair := scopeAttributes[key]
			sa, err := e.newSumScopeAttributeValue(sum.SumID, key, scopeName, scopeVersion, vtPair)
			if err != nil {
				e.logger.Error(err.Error())
			} else {
				scopeAttribute = append(scopeAttribute, *sa)
			}
		}

		copy(kiSumRecord.sumScopeAttribute, scopeAttribute)
	} else {
		// No attributes found - just basic scope
		kiSumRecord.sumScopeAttribute = append(kiSumRecord.sumScopeAttribute, SumScopeAttribute{
			SumID:          sum.SumID,
			ScopeName:      scopeName,
			ScopeVersion:   scopeVersion,
			Key:            "",
			AttributeValue: AttributeValue{},
		})
	}

	return kiSumRecord, multierr.Combine(errs...)
}