in exporter/collector/metrics.go [536:611]
func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
me.wal.mutex.Lock()
defer me.wal.mutex.Unlock()
// close and reopen the WAL to sync indices
readIndex, writeIndex, err := me.setupWAL()
if err != nil {
return err
}
bytes, err := me.wal.Read(readIndex)
if err == nil {
req := new(monitoringpb.CreateTimeSeriesRequest)
if err = proto.Unmarshal(bytes, req); err != nil {
return err
}
// on network failures, retry exponentially a max of 11 times (2^12s > 48 hours, older than allowed by GCM)
// or until user-configured max backoff is hit.
backoff := 0
for i := 0; i < 12; i++ {
err = me.export(ctx, req)
if err != nil {
me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
}
// retry at same read index if retryable (network) error
if isNotRecoverable(err) {
break
}
me.obs.log.Error("retryable error, retrying request")
backoff = 1 << i
if time.Duration(backoff)*time.Second >= me.wal.maxBackoff {
break
}
time.Sleep(time.Duration(backoff) * time.Second)
}
// If we are at the last index, and this last index is not an empty request
// (we use empty requests to fill out the end of a log, and if we didn't check for them
// this would loop constantly adding empty requests onto the end)
if readIndex == writeIndex && !isEmptyReq(req) {
// This indicates that we are trying to truncate the last item in the WAL.
// If that is the case, write an empty request so we can truncate the last real request
// (the WAL library requires at least 1 entry).
// Doing so prevents double-exporting in the event of a collector restart.
emptyReq := &monitoringpb.CreateTimeSeriesRequest{}
bytes, bytesErr := proto.Marshal(emptyReq)
if bytesErr != nil {
return bytesErr
}
writeIndex++
err = me.wal.Write(writeIndex, bytes)
if err != nil {
return err
}
}
// Truncate if readIndex < writeIndex.
// This only happens if there are more entries in the WAL
// OR, we are at the last real entry and added an "empty" entry above, in which we also increment writeIndex.
// otherwise, we've reached the end of the WAL and should be at an empty entry, which the export drops.
// If that's the case, and we try to truncate (ie, move readIndex+1), the library returns ErrOutOfRange.
if readIndex >= writeIndex {
// wal.ErrNotFound is used by wal.Read() to indicate the end of the WAL, but
// the wal library doesn't know about our hackery around empty entries.
// So it's used by us to indicate the same.
return wal.ErrNotFound
}
err = me.wal.TruncateFront(readIndex + 1)
if err != nil {
return err
}
}
return err
}