in loadgen/eventhandler/handler.go [306:368]
func (h *Handler) sendBatch(
ctx context.Context,
s *state,
b batch,
baseTimestamp time.Time,
randomBits uint64,
) error {
events := b.events
h.logger.Debug("sending events in burst", zap.Int("size", len(events)))
for len(events) > 0 {
n := len(events)
if s.burst > 0 {
mod := s.sent % s.burst
if mod == 0 {
// We're starting a new iteration, so wait to send a burst.
if err := h.config.Limiter.WaitN(ctx, s.burst); err != nil {
return fmt.Errorf("cannot wait for limiter to allow burst: %w", err)
}
}
// Send as many events of the batch as we can, up to the
// burst size minus however many events have been sent for
// this iteration.
capacity := s.burst - mod
h.logger.Debug("calculated capacity", zap.Int("value", capacity))
if n > capacity {
n = capacity
}
}
writer := newEventWriter()
if err := h.config.Writer(h.config, h.minTimestamp,
writer, batch{
metadata: b.metadata,
events: events[:n],
}, baseTimestamp, randomBits); err != nil {
return fmt.Errorf("cannot write events: %w", err)
}
if err := writer.Close(); err != nil {
return fmt.Errorf("cannot close writer: %w", err)
}
h.logger.Debug("wrote events to buffer",
zap.Int("bytes.uncompressed", writer.written),
zap.Int("bytes.compressed", writer.buf.Len()),
)
h.logger.Debug("closed writer")
// Do not reuse `writer`: in error cases, SendEvents may return while the request body is
// still being transmitted by the HTTP library. Reusing `writer` could cause a panic due to
// concurrent reads & writes.
if err := h.config.Transport.SendEvents(ctx, &writer.buf, h.config.IgnoreErrors); err != nil {
return fmt.Errorf("cannot send events through transport: %w", err)
}
h.logger.Debug("sent events through transport")
s.sent += n
events = events[n:]
}
return nil
}