func()

in loadgen/eventhandler/handler.go [306:368]


func (h *Handler) sendBatch(
	ctx context.Context,
	s *state,
	b batch,
	baseTimestamp time.Time,
	randomBits uint64,
) error {
	events := b.events
	h.logger.Debug("sending events in burst", zap.Int("size", len(events)))

	for len(events) > 0 {
		n := len(events)
		if s.burst > 0 {
			mod := s.sent % s.burst
			if mod == 0 {
				// We're starting a new iteration, so wait to send a burst.
				if err := h.config.Limiter.WaitN(ctx, s.burst); err != nil {
					return fmt.Errorf("cannot wait for limiter to allow burst: %w", err)
				}
			}
			// Send as many events of the batch as we can, up to the
			// burst size minus however many events have been sent for
			// this iteration.
			capacity := s.burst - mod
			h.logger.Debug("calculated capacity", zap.Int("value", capacity))

			if n > capacity {
				n = capacity
			}
		}

		writer := newEventWriter()
		if err := h.config.Writer(h.config, h.minTimestamp,
			writer, batch{
				metadata: b.metadata,
				events:   events[:n],
			}, baseTimestamp, randomBits); err != nil {
			return fmt.Errorf("cannot write events: %w", err)
		}

		if err := writer.Close(); err != nil {
			return fmt.Errorf("cannot close writer: %w", err)
		}
		h.logger.Debug("wrote events to buffer",
			zap.Int("bytes.uncompressed", writer.written),
			zap.Int("bytes.compressed", writer.buf.Len()),
		)

		h.logger.Debug("closed writer")

		// Do not reuse `writer`: in error cases, SendEvents may return while the request body is
		// still being transmitted by the HTTP library. Reusing `writer` could cause a panic due to
		// concurrent reads & writes.
		if err := h.config.Transport.SendEvents(ctx, &writer.buf, h.config.IgnoreErrors); err != nil {
			return fmt.Errorf("cannot send events through transport: %w", err)
		}
		h.logger.Debug("sent events through transport")

		s.sent += n
		events = events[n:]
	}
	return nil
}