in exporter/collector/metrics.go [329:438]
func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) error {
if me.client == nil {
return errors.New("not started")
}
if me.wal != nil {
me.wal.mutex.Lock()
defer me.wal.mutex.Unlock()
}
// map from project -> []timeseries. This groups timeseries by the project
// they need to be sent to. Each project's timeseries are sent in a
// separate request later.
pendingTimeSeries := map[string][]*monitoringpb.TimeSeries{}
// add extra metrics from the ExtraMetrics() extension point
if me.cfg.MetricConfig.ExtraMetrics != nil {
me.cfg.MetricConfig.ExtraMetrics(m)
}
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
monitoredResource := me.cfg.MetricConfig.MapMonitoredResource(rm.Resource())
extraResourceLabels := attributesToLabels(filterAttributes(rm.Resource().Attributes(), me.cfg.MetricConfig.ServiceResourceLabels, me.cfg.MetricConfig.ResourceFilters))
projectID := me.cfg.ProjectID
// override project ID with gcp.project.id, if present
if projectFromResource, found := rm.Resource().Attributes().Get(resourcemapping.ProjectIDAttributeKey); found {
projectID = projectFromResource.AsString()
}
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
instrumentationScopeLabels := me.mapper.instrumentationScopeToLabels(sm.Scope())
metricLabels := mergeLabels(nil, instrumentationScopeLabels, extraResourceLabels)
mes := sm.Metrics()
for k := 0; k < mes.Len(); k++ {
metric := mes.At(k)
pendingTimeSeries[projectID] = append(pendingTimeSeries[projectID], me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric, projectID)...)
// We only send metric descriptors if we're configured *and* we're not sending service timeseries.
if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries {
continue
}
for _, md := range me.mapper.metricDescriptor(metric, metricLabels) {
if md == nil {
continue
}
req := &monitoringpb.CreateMetricDescriptorRequest{
Name: projectName(projectID),
MetricDescriptor: md,
}
select {
case me.metricDescriptorC <- req:
default:
// Ignore drops, we'll catch descriptor next time around.
}
}
}
}
}
var errs []error
// timeseries for each project are batched and exported separately
for projectID, projectTS := range pendingTimeSeries {
// Batch and export
for len(projectTS) > 0 {
var sendSize int
if len(projectTS) < sendBatchSize {
sendSize = len(projectTS)
} else {
sendSize = sendBatchSize
}
var ts []*monitoringpb.TimeSeries
ts, projectTS = projectTS[:sendSize], projectTS[sendSize:]
req := &monitoringpb.CreateTimeSeriesRequest{
Name: projectName(projectID),
TimeSeries: ts,
}
if me.wal != nil {
// push request onto the WAL
bytes, err := proto.Marshal(req)
if err != nil {
errs = append(errs, fmt.Errorf("failed to marshal protobuf to bytes: %+v", err))
continue
}
writeIndex, err := me.wal.LastIndex()
if err != nil {
errs = append(errs, fmt.Errorf("failed to get LastIndex of WAL: %+v", err))
continue
}
err = me.wal.Write(writeIndex+1, bytes)
if err != nil {
errs = append(errs, fmt.Errorf("failed to write to WAL: %+v", err))
continue
}
} else {
// otherwise export directly
errs = append(errs, me.export(ctx, req))
}
}
}
return errors.Join(errs...)
}