func()

in pkg/export/export.go [552:605]


func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) {
	// Wether we're sending data or not, add batchsize of samples exported by
	// Prometheus from appender commit.
	batchSize := len(batch)
	samplesExported.Add(float64(batchSize))

	if e.opts.Disable {
		return
	}

	metadata = e.wrapMetadata(metadata)

	e.mtx.Lock()
	externalLabels := e.externalLabels
	start, end, ok := e.lease.Range()
	e.mtx.Unlock()

	if !ok {
		exemplarsDropped.WithLabelValues("not-in-ha-range").Add(float64(len(exemplarMap)))
		samplesDropped.WithLabelValues("not-in-ha-range").Add(float64(batchSize))
		return
	}
	builder := newSampleBuilder(e.seriesCache)
	defer builder.close()
	exemplarsExported.Add(float64(len(exemplarMap)))

	for len(batch) > 0 {
		var (
			samples []hashedSeries
			err     error
		)
		samples, batch, err = builder.next(metadata, externalLabels, batch, exemplarMap)
		if err != nil {
			//nolint:errcheck
			level.Debug(e.logger).Log("msg", "building sample failed", "err", err)
			continue
		}
		for _, s := range samples {
			// Only enqueue samples for within our HA range.
			if sampleInRange(s.proto, start, end) {
				e.enqueue(s.hash, s.proto)
			} else {
				// Hashed series protos should only ever have one point. If this is
				// a distribution increase exemplarsDropped if there are exemplars.
				if dist := s.proto.Points[0].Value.GetDistributionValue(); dist != nil {
					exemplarsDropped.WithLabelValues("not-in-ha-range").Add(float64(len(dist.GetExemplars())))
				}
				samplesDropped.WithLabelValues("not-in-ha-range").Inc()
			}
		}
	}
	// Signal that new data is available.
	e.triggerNext()
}