in scrape/scrape.go [1508:1755]
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms)
if err != nil {
level.Debug(sl.l).Log(
"msg", "Invalid content type on scrape, using prometheus parser as fallback.",
"content_type", contentType,
"err", err,
)
}
var (
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
e exemplar.Exemplar // escapes to heap so hoisted out of loop
meta metadata.Metadata
metadataChanged bool
)
// updateMetadata updates the current iteration's metadata object and the
// metadataChanged value if we have metadata in the scrape cache AND the
// labelset is for a new series or the metadata for this series has just
// changed. It returns a boolean based on whether the metadata was updated.
updateMetadata := func(lset labels.Labels, isNewSeries bool) bool {
if !sl.appendMetadataToWAL {
return false
}
sl.cache.metaMtx.Lock()
defer sl.cache.metaMtx.Unlock()
metaEntry, metaOk := sl.cache.metadata[lset.Get(labels.MetricName)]
if metaOk && (isNewSeries || metaEntry.lastIterChange == sl.cache.iter) {
metadataChanged = true
meta.Type = metaEntry.Type
meta.Unit = metaEntry.Unit
meta.Help = metaEntry.Help
return true
}
return false
}
// Take an appender with limits.
app = appender(app, sl.sampleLimit, sl.bucketLimit)
defer func() {
if err != nil {
return
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
}()
loop:
for {
var (
et textparse.Entry
sampleAdded, isHistogram bool
met []byte
parsedTimestamp *int64
val float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
if et, err = p.Next(); err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
break
}
switch et {
case textparse.EntryType:
sl.cache.setType(p.Type())
continue
case textparse.EntryHelp:
sl.cache.setHelp(p.Help())
continue
case textparse.EntryUnit:
sl.cache.setUnit(p.Unit())
continue
case textparse.EntryComment:
continue
case textparse.EntryHistogram:
isHistogram = true
default:
}
total++
t := defTime
if isHistogram {
met, parsedTimestamp, h, fh = p.Histogram()
} else {
met, parsedTimestamp, val = p.Series()
}
if !sl.honorTimestamps {
parsedTimestamp = nil
}
if parsedTimestamp != nil {
t = *parsedTimestamp
}
// Zero metadata out for current iteration until it's resolved.
meta = metadata.Metadata{}
metadataChanged = false
if sl.cache.getDropped(met) {
continue
}
ce, ok := sl.cache.get(met)
var (
ref storage.SeriesRef
lset labels.Labels
hash uint64
)
if ok {
ref = ce.ref
lset = ce.lset
// Update metadata only if it changed in the current iteration.
updateMetadata(lset, false)
} else {
p.Metric(&lset)
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
// The label set may be set to empty to indicate dropping.
if lset.IsEmpty() {
sl.cache.addDropped(met)
continue
}
if !lset.Has(labels.MetricName) {
err = errNameLabelMandatory
break loop
}
if !lset.IsValid() {
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
break loop
}
// If any label limits is exceeded the scrape should fail.
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
targetScrapePoolExceededLabelLimits.Inc()
break loop
}
// Append metadata for new series if they were present.
updateMetadata(lset, true)
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if err != storage.ErrNotFound {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
}
break loop
}
if !ok {
if parsedTimestamp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset)
}
sl.cache.addRef(met, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
}
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
added++
if hasExemplar := p.Exemplar(&e); hasExemplar {
if !e.HasTs {
e.Ts = t
}
_, exemplarErr := app.AppendExemplar(ref, lset, e)
exemplarErr = sl.checkAddExemplarError(exemplarErr, e, &appErrs)
if exemplarErr != nil {
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
}
e = exemplar.Exemplar{} // reset for next time round loop
}
if sl.appendMetadataToWAL && metadataChanged {
if _, merr := app.UpdateMetadata(ref, lset, meta); merr != nil {
// No need to fail the scrape on errors appending metadata.
level.Debug(sl.l).Log("msg", "Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", meta), "err", merr)
}
}
}
if sampleLimitErr != nil {
if err == nil {
err = sampleLimitErr
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeSampleLimit.Inc()
}
if bucketLimitErr != nil {
if err == nil {
err = bucketLimitErr // If sample limit is hit, that error takes precedence.
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeNativeHistogramBucketLimit.Inc()
}
if appErrs.numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
}
if appErrs.numDuplicates > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
}
if appErrs.numOutOfBounds > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
}
if appErrs.numExemplarOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
}
return
}