func()

in exporter/collector/metrics.go [329:438]


func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) error {
	if me.client == nil {
		return errors.New("not started")
	}
	if me.wal != nil {
		me.wal.mutex.Lock()
		defer me.wal.mutex.Unlock()
	}
	// map from project -> []timeseries. This groups timeseries by the project
	// they need to be sent to. Each project's timeseries are sent in a
	// separate request later.
	pendingTimeSeries := map[string][]*monitoringpb.TimeSeries{}

	// add extra metrics from the ExtraMetrics() extension point
	if me.cfg.MetricConfig.ExtraMetrics != nil {
		me.cfg.MetricConfig.ExtraMetrics(m)
	}
	rms := m.ResourceMetrics()

	for i := 0; i < rms.Len(); i++ {
		rm := rms.At(i)
		monitoredResource := me.cfg.MetricConfig.MapMonitoredResource(rm.Resource())
		extraResourceLabels := attributesToLabels(filterAttributes(rm.Resource().Attributes(), me.cfg.MetricConfig.ServiceResourceLabels, me.cfg.MetricConfig.ResourceFilters))
		projectID := me.cfg.ProjectID
		// override project ID with gcp.project.id, if present
		if projectFromResource, found := rm.Resource().Attributes().Get(resourcemapping.ProjectIDAttributeKey); found {
			projectID = projectFromResource.AsString()
		}
		sms := rm.ScopeMetrics()
		for j := 0; j < sms.Len(); j++ {
			sm := sms.At(j)

			instrumentationScopeLabels := me.mapper.instrumentationScopeToLabels(sm.Scope())
			metricLabels := mergeLabels(nil, instrumentationScopeLabels, extraResourceLabels)

			mes := sm.Metrics()
			for k := 0; k < mes.Len(); k++ {
				metric := mes.At(k)
				pendingTimeSeries[projectID] = append(pendingTimeSeries[projectID], me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric, projectID)...)

				// We only send metric descriptors if we're configured *and* we're not sending service timeseries.
				if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries {
					continue
				}

				for _, md := range me.mapper.metricDescriptor(metric, metricLabels) {
					if md == nil {
						continue
					}
					req := &monitoringpb.CreateMetricDescriptorRequest{
						Name:             projectName(projectID),
						MetricDescriptor: md,
					}
					select {
					case me.metricDescriptorC <- req:
					default:
						// Ignore drops, we'll catch descriptor next time around.
					}
				}
			}
		}
	}

	var errs []error
	// timeseries for each project are batched and exported separately
	for projectID, projectTS := range pendingTimeSeries {
		// Batch and export
		for len(projectTS) > 0 {
			var sendSize int
			if len(projectTS) < sendBatchSize {
				sendSize = len(projectTS)
			} else {
				sendSize = sendBatchSize
			}

			var ts []*monitoringpb.TimeSeries
			ts, projectTS = projectTS[:sendSize], projectTS[sendSize:]

			req := &monitoringpb.CreateTimeSeriesRequest{
				Name:       projectName(projectID),
				TimeSeries: ts,
			}

			if me.wal != nil {
				// push request onto the WAL
				bytes, err := proto.Marshal(req)
				if err != nil {
					errs = append(errs, fmt.Errorf("failed to marshal protobuf to bytes: %+v", err))
					continue
				}

				writeIndex, err := me.wal.LastIndex()
				if err != nil {
					errs = append(errs, fmt.Errorf("failed to get LastIndex of WAL: %+v", err))
					continue
				}

				err = me.wal.Write(writeIndex+1, bytes)
				if err != nil {
					errs = append(errs, fmt.Errorf("failed to write to WAL: %+v", err))
					continue
				}
			} else {
				// otherwise export directly
				errs = append(errs, me.export(ctx, req))
			}
		}
	}
	return errors.Join(errs...)
}