func()

in exporter/kineticaexporter/metrics_exporter.go [1161:1353]


func (e *kineticaMetricsExporter) createGaugeRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, gaugeRecord pmetric.Gauge, name, description, unit string) (*kineticaGaugeRecord, error) {
	var errs []error

	kiGaugeRecord := new(kineticaGaugeRecord)

	gauge := &Gauge{
		GaugeID:     uuid.New().String(),
		MetricName:  name,
		Description: description,
		Unit:        unit,
	}

	kiGaugeRecord.gauge = gauge

	// Handle data points
	var gaugeDatapointAttribute []GaugeDatapointAttribute
	gaugeDatapointAttributes := make(map[string]ValueTypePair)

	var exemplarAttribute []GaugeDataPointExemplarAttribute
	exemplarAttributes := make(map[string]ValueTypePair)

	for i := 0; i < gaugeRecord.DataPoints().Len(); i++ {
		datapoint := gaugeRecord.DataPoints().At(i)

		gaugeDatapoint := GaugeDatapoint{
			GaugeID:       gauge.GaugeID,
			ID:            uuid.New().String(),
			StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(),
			TimeUnix:      datapoint.Timestamp().AsTime().UnixMilli(),
			GaugeValue:    datapoint.DoubleValue(),
			Flags:         int(datapoint.Flags()),
		}
		kiGaugeRecord.datapoint = append(kiGaugeRecord.datapoint, gaugeDatapoint)

		for k, v := range datapoint.Attributes().All() {
			if k == "" {
				e.logger.Debug("Gauge record attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				gaugeDatapointAttributes[k] = v
			} else {
				e.logger.Debug("Invalid gauge record attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		for key := range gaugeDatapointAttributes {
			vtPair := gaugeDatapointAttributes[key]
			ga, err := e.newGaugeDatapointAttributeValue(gauge.GaugeID, gaugeDatapoint.ID, key, vtPair)
			if err != nil {
				e.logger.Error(err.Error())
			} else {
				gaugeDatapointAttribute = append(gaugeDatapointAttribute, *ga)
			}
		}

		kiGaugeRecord.datapointAttribute = append(kiGaugeRecord.datapointAttribute, gaugeDatapointAttribute...)

		for k := range gaugeDatapointAttributes {
			delete(gaugeDatapointAttributes, k)
		}

		// Handle data point exemplars
		exemplars := datapoint.Exemplars()
		for i := 0; i < exemplars.Len(); i++ {
			exemplar := exemplars.At(i)
			gaugeDatapointExemplar := GaugeDatapointExemplar{
				GaugeID:     gauge.GaugeID,
				DatapointID: gaugeDatapoint.ID,
				ExemplarID:  uuid.New().String(),
				TimeUnix:    exemplar.Timestamp().AsTime().UnixMilli(),
				GaugeValue:  exemplar.DoubleValue(),
				TraceID:     exemplar.TraceID().String(),
				SpanID:      exemplar.SpanID().String(),
			}
			kiGaugeRecord.exemplars = append(kiGaugeRecord.exemplars, gaugeDatapointExemplar)

			// Handle Exemplar attribute
			for k, v := range exemplar.FilteredAttributes().All() {
				if k == "" {
					e.logger.Debug("Sum record attribute key is empty")
				} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
					exemplarAttributes[k] = v
				} else {
					e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error()))
					errs = append(errs, err)
				}
			}

			for key := range exemplarAttributes {
				vtPair := exemplarAttributes[key]
				ea, err := e.newGaugeDatapointExemplarAttributeValue(gauge.GaugeID, gaugeDatapoint.ID, gaugeDatapointExemplar.ExemplarID, key, vtPair)
				if err != nil {
					e.logger.Error(err.Error())
				} else {
					exemplarAttribute = append(exemplarAttribute, *ea)
				}
			}

			kiGaugeRecord.exemplarAttribute = append(kiGaugeRecord.exemplarAttribute, exemplarAttribute...)

			for k := range exemplarAttributes {
				delete(exemplarAttributes, k)
			}
		}
	}

	// Handle Resource attribute
	e.logger.Debug("Resource Attributes received ->", zap.Any("Attributes", resAttr))

	var resourceAttribute []GaugeResourceAttribute
	resourceAttributes := make(map[string]ValueTypePair)

	if resAttr.Len() > 0 {
		for k, v := range resAttr.All() {
			if k == "" {
				e.logger.Debug("Resource attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				resourceAttributes[k] = v
			} else {
				e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		e.logger.Debug("Resource Attributes to be added ->", zap.Any("Attributes", resourceAttributes))
		for key := range resourceAttributes {
			vtPair := resourceAttributes[key]
			ga, err := e.newGaugeResourceAttributeValue(gauge.GaugeID, key, vtPair)

			e.logger.Debug("New resource attribute ->", zap.Any("Attribute", ga))

			if err != nil {
				e.logger.Error(err.Error())
			} else {
				resourceAttribute = append(resourceAttribute, *ga)
			}
		}

		kiGaugeRecord.resourceAttribute = make([]GaugeResourceAttribute, len(resourceAttribute))
		copy(kiGaugeRecord.resourceAttribute, resourceAttribute)
		e.logger.Debug("Resource Attributes actually added ->", zap.Any("Attributes", kiGaugeRecord.resourceAttribute))
	}

	// Handle Scope attribute
	e.logger.Debug("Scope Attributes received ->", zap.Any("Attributes", scopeInstr.Attributes()))

	var scopeAttribute []GaugeScopeAttribute
	scopeAttributes := make(map[string]ValueTypePair)
	scopeName := scopeInstr.Name()
	scopeVersion := scopeInstr.Version()

	if scopeInstr.Attributes().Len() > 0 {
		for k, v := range scopeInstr.Attributes().All() {
			if k == "" {
				e.logger.Debug("Scope attribute key is empty")
			} else if v, err := attributeValueToKineticaFieldValue(v); err == nil {
				scopeAttributes[k] = v
			} else {
				e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error()))
				errs = append(errs, err)
			}
		}

		e.logger.Debug("Scope Attributes to be added ->", zap.Any("Attributes", scopeAttributes))
		for key := range scopeAttributes {
			vtPair := scopeAttributes[key]
			ga, err := e.newGaugeScopeAttributeValue(gauge.GaugeID, key, scopeName, scopeVersion, vtPair)

			e.logger.Debug("New scope attribute ->", zap.Any("Attribute", ga))

			if err != nil {
				e.logger.Error(err.Error())
			} else {
				scopeAttribute = append(scopeAttribute, *ga)
			}
		}

		kiGaugeRecord.scopeAttribute = make([]GaugeScopeAttribute, len(scopeAttribute))
		copy(kiGaugeRecord.scopeAttribute, scopeAttribute)
		e.logger.Debug("Scope Attributes actually added ->", zap.Any("Attributes", kiGaugeRecord.scopeAttribute))
	} else {
		// No attributes found - just basic scope
		kiGaugeRecord.scopeAttribute = append(kiGaugeRecord.scopeAttribute, GaugeScopeAttribute{
			GaugeID:        gauge.GaugeID,
			ScopeName:      scopeName,
			ScopeVersion:   scopeVersion,
			Key:            "",
			AttributeValue: AttributeValue{},
		})
	}

	return kiGaugeRecord, multierr.Combine(errs...)
}