func()

in exporter/collector/metrics.go [536:611]


func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
	me.wal.mutex.Lock()
	defer me.wal.mutex.Unlock()

	// close and reopen the WAL to sync indices
	readIndex, writeIndex, err := me.setupWAL()
	if err != nil {
		return err
	}

	bytes, err := me.wal.Read(readIndex)
	if err == nil {
		req := new(monitoringpb.CreateTimeSeriesRequest)
		if err = proto.Unmarshal(bytes, req); err != nil {
			return err
		}

		// on network failures, retry exponentially a max of 11 times (2^12s > 48 hours, older than allowed by GCM)
		// or until user-configured max backoff is hit.
		backoff := 0
		for i := 0; i < 12; i++ {
			err = me.export(ctx, req)
			if err != nil {
				me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
			}
			// retry at same read index if retryable (network) error
			if isNotRecoverable(err) {
				break
			}
			me.obs.log.Error("retryable error, retrying request")
			backoff = 1 << i
			if time.Duration(backoff)*time.Second >= me.wal.maxBackoff {
				break
			}
			time.Sleep(time.Duration(backoff) * time.Second)
		}

		// If we are at the last index, and this last index is not an empty request
		// (we use empty requests to fill out the end of a log, and if we didn't check for them
		// this would loop constantly adding empty requests onto the end)
		if readIndex == writeIndex && !isEmptyReq(req) {
			// This indicates that we are trying to truncate the last item in the WAL.
			// If that is the case, write an empty request so we can truncate the last real request
			// (the WAL library requires at least 1 entry).
			// Doing so prevents double-exporting in the event of a collector restart.
			emptyReq := &monitoringpb.CreateTimeSeriesRequest{}
			bytes, bytesErr := proto.Marshal(emptyReq)
			if bytesErr != nil {
				return bytesErr
			}

			writeIndex++
			err = me.wal.Write(writeIndex, bytes)
			if err != nil {
				return err
			}
		}

		// Truncate if readIndex < writeIndex.
		// This only happens if there are more entries in the WAL
		// OR, we are at the last real entry and added an "empty" entry above, in which we also increment writeIndex.
		// otherwise, we've reached the end of the WAL and should be at an empty entry, which the export drops.
		// If that's the case, and we try to truncate (ie, move readIndex+1), the library returns ErrOutOfRange.
		if readIndex >= writeIndex {
			// wal.ErrNotFound is used by wal.Read() to indicate the end of the WAL, but
			// the wal library doesn't know about our hackery around empty entries.
			// So it's used by us to indicate the same.
			return wal.ErrNotFound
		}
		err = me.wal.TruncateFront(readIndex + 1)
		if err != nil {
			return err
		}
	}
	return err
}