in prometheus-to-sd/translator/stackdriver.go [42:81]
func SendToStackdriver(ctx context.Context, client *monitoring.MetricClient, config *config.CommonConfig, ts []*v3.TimeSeries, scrapeTimestamp time.Time) {
if len(ts) == 0 {
glog.V(3).Infof("No metrics to send to Stackdriver for component %v", config.SourceConfig.Component)
return
}
proj := createProjectName(config.GceConfig)
var wg sync.WaitGroup
var failedTs uint32
for i := 0; i < len(ts); i += maxTimeseriesPerRequest {
end := i + maxTimeseriesPerRequest
if end > len(ts) {
end = len(ts)
}
wg.Add(1)
go func(begin int, end int) {
defer wg.Done()
req := &v3.CreateTimeSeriesRequest{
Name: proj,
TimeSeries: ts[begin:end],
}
err := client.CreateServiceTimeSeries(ctx, req)
now := time.Now()
if err != nil {
atomic.AddUint32(&failedTs, uint32(end-begin))
glog.Errorf("Error while sending request to Stackdriver %v", err)
return
}
for i := begin; i < end; i++ {
metricIngestionLatency.WithLabelValues(config.SourceConfig.Component).Observe(now.Sub(scrapeTimestamp).Seconds())
}
}(i, end)
}
wg.Wait()
sentTs := uint32(len(ts)) - failedTs
glog.V(4).Infof("Successfully sent %v timeseries to Stackdriver for component %v", sentTs, config.SourceConfig.Component)
timeseriesPushed.WithLabelValues(config.SourceConfig.Component).Add(float64(sentTs))
timeseriesDropped.WithLabelValues(config.SourceConfig.Component).Add(float64(failedTs))
}