func()

in exporter/kineticaexporter/metrics_exporter.go [535:733]


func (e *kineticaMetricsExporter) createExponentialHistogramRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, exponentialHistogramRecord pmetric.ExponentialHistogram, name, description, unit string) (*kineticaExponentialHistogramRecord, error) {
	var errs []error

	kiExpHistogramRecord := new(kineticaExponentialHistogramRecord)

	histogram := &ExponentialHistogram{
		HistogramID:            uuid.New().String(),
		MetricName:             name,
		Description:            description,
		Unit:                   unit,
		AggregationTemporality: exponentialHistogramRecord.AggregationTemporality(),
	}

	kiExpHistogramRecord.histogram = histogram

	// Handle data points
	var datapointAttribute []ExponentialHistogramDataPointAttribute
	datapointAttributes := make(map[string]ValueTypePair)

	var exemplarAttribute []ExponentialHistogramDataPointExemplarAttribute
	exemplarAttributes := make(map[string]ValueTypePair)

	var datapointBucketPositiveCount []ExponentialHistogramBucketPositiveCount
	var datapointBucketNegativeCount []ExponentialHistogramBucketNegativeCount

	for i := 0; i < exponentialHistogramRecord.DataPoints().Len(); i++ {
		datapoint := exponentialHistogramRecord.DataPoints().At(i)

		expHistogramDatapoint := ExponentialHistogramDatapoint{
			HistogramID:           histogram.HistogramID,
			ID:                    uuid.New().String(),
			StartTimeUnix:         datapoint.StartTimestamp().AsTime().UnixMilli(),
			TimeUnix:              datapoint.Timestamp().AsTime().UnixMilli(),
			Count:                 int64(datapoint.Count()),
			Sum:                   datapoint.Sum(),
			Min:                   datapoint.Min(),
			Max:                   datapoint.Max(),
			Flags:                 int(datapoint.Flags()),
			Scale:                 int(datapoint.Scale()),
			ZeroCount:             int64(datapoint.ZeroCount()),
			BucketsPositiveOffset: int(datapoint.Positive().Offset()),
			BucketsNegativeOffset: int(datapoint.Negative().Offset()),
		}
		kiExpHistogramRecord.histogramDatapoint = append(kiExpHistogramRecord.histogramDatapoint, expHistogramDatapoint)

		// Handle histogram datapoint attribute
		for k, v := range datapoint.Attributes().All() {
			if k == "" {
				e.logger.Debug("Sum record attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				datapointAttributes[k] = v
			} else {
				e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		for key := range datapointAttributes {
			vtPair := datapointAttributes[key]
			sa, err := e.newExponentialHistogramDatapointAttributeValue(histogram.HistogramID, expHistogramDatapoint.ID, key, vtPair)
			if err != nil {
				e.logger.Error(err.Error())
			} else {
				datapointAttribute = append(datapointAttribute, *sa)
			}
		}
		kiExpHistogramRecord.histogramDatapointAttribute = append(kiExpHistogramRecord.histogramDatapointAttribute, datapointAttribute...)

		for k := range datapointAttributes {
			delete(datapointAttributes, k)
		}

		// Handle datapoint exemplars
		exemplars := datapoint.Exemplars()

		for i := 0; i < exemplars.Len(); i++ {
			exemplar := exemplars.At(i)
			sumDatapointExemplar := ExponentialHistogramDatapointExemplar{
				HistogramID:    histogram.HistogramID,
				DatapointID:    expHistogramDatapoint.ID,
				ExemplarID:     uuid.New().String(),
				TimeUnix:       exemplar.Timestamp().AsTime().UnixMilli(),
				HistogramValue: exemplar.DoubleValue(),
				TraceID:        exemplar.TraceID().String(),
				SpanID:         exemplar.SpanID().String(),
			}
			kiExpHistogramRecord.exemplars = append(kiExpHistogramRecord.exemplars, sumDatapointExemplar)

			// Handle Exemplar attribute
			for k, v := range exemplar.FilteredAttributes().All() {
				if k == "" {
					e.logger.Debug("Sum record attribute key is empty")
				} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
					exemplarAttributes[k] = v
				} else {
					e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
					errs = append(errs, err)
				}
			}

			for key := range exemplarAttributes {
				vtPair := exemplarAttributes[key]
				ea, err := e.newExponentialHistogramDatapointExemplarAttributeValue(expHistogramDatapoint.HistogramID, expHistogramDatapoint.ID, sumDatapointExemplar.ExemplarID, key, vtPair)
				if err != nil {
					e.logger.Error(err.Error())
				} else {
					exemplarAttribute = append(exemplarAttribute, *ea)
				}
			}

			kiExpHistogramRecord.exemplarAttribute = append(kiExpHistogramRecord.exemplarAttribute, exemplarAttribute...)

			for k := range exemplarAttributes {
				delete(exemplarAttributes, k)
			}
		}

		// Handle positive and negative bucket counts
		for i := 0; i < datapoint.Positive().BucketCounts().Len(); i++ {
			positiveBucketCount := datapoint.Positive().BucketCounts().At(i)
			datapointBucketPositiveCount = append(datapointBucketPositiveCount, ExponentialHistogramBucketPositiveCount{
				HistogramID: expHistogramDatapoint.HistogramID,
				DatapointID: expHistogramDatapoint.ID,
				CountID:     uuid.New().String(),
				Count:       int64(positiveBucketCount),
			})
		}
		kiExpHistogramRecord.histogramBucketPositiveCount = append(kiExpHistogramRecord.histogramBucketPositiveCount, datapointBucketPositiveCount...)

		for i := 0; i < datapoint.Negative().BucketCounts().Len(); i++ {
			negativeBucketCount := datapoint.Negative().BucketCounts().At(i)
			datapointBucketNegativeCount = append(datapointBucketNegativeCount, ExponentialHistogramBucketNegativeCount{
				HistogramID: expHistogramDatapoint.HistogramID,
				DatapointID: expHistogramDatapoint.ID,
				CountID:     uuid.New().String(),
				Count:       negativeBucketCount,
			})
		}
		kiExpHistogramRecord.histogramBucketNegativeCount = append(kiExpHistogramRecord.histogramBucketNegativeCount, datapointBucketNegativeCount...)
	}

	// Handle Resource attribute
	var resourceAttribute []ExponentialHistogramResourceAttribute
	resourceAttributes := make(map[string]ValueTypePair)

	for k, v := range resAttr.All() {
		if k == "" {
			e.logger.Debug("Resource attribute key is empty")
		} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
			resourceAttributes[k] = v
		} else {
			e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error()))
			errs = append(errs, err)
		}
	}

	for key := range resourceAttributes {
		vtPair := resourceAttributes[key]
		ga, err := e.newExponentialHistogramResourceAttributeValue(histogram.HistogramID, key, vtPair)
		if err != nil {
			e.logger.Error(err.Error())
		} else {
			resourceAttribute = append(resourceAttribute, *ga)
		}
	}

	copy(kiExpHistogramRecord.histogramResourceAttribute, resourceAttribute)

	// Handle Scope attribute
	var scopeAttribute []ExponentialHistogramScopeAttribute
	scopeAttributes := make(map[string]ValueTypePair)
	scopeName := scopeInstr.Name()
	scopeVersion := scopeInstr.Version()

	for k, v := range scopeInstr.Attributes().All() {
		if k == "" {
			e.logger.Debug("Scope attribute key is empty")
		} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
			scopeAttributes[k] = v
		} else {
			e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error()))
			errs = append(errs, err)
		}
	}

	for key := range scopeAttributes {
		vtPair := scopeAttributes[key]
		sa, err := e.newExponentialHistogramScopeAttributeValue(histogram.HistogramID, key, scopeName, scopeVersion, vtPair)
		if err != nil {
			e.logger.Error(err.Error())
		} else {
			scopeAttribute = append(scopeAttribute, *sa)
		}
	}

	copy(kiExpHistogramRecord.histogramScopeAttribute, scopeAttribute)

	return kiExpHistogramRecord, multierr.Combine(errs...)
}