in internal/pkg/bulk/engine.go [458:529]
func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error {
start := time.Now()
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("cnt", queue.cnt).
Int("szPending", queue.pending).
Str("queue", queue.Type()).
Msg("flushQueue Wait")
acquireCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
defer cancel()
if err := w.Acquire(acquireCtx, 1); err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error")
return err
}
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("cnt", queue.cnt).
Dur("tdiff", time.Since(start)).
Int("szPending", queue.pending).
Str("queue", queue.Type()).
Msg("flushQueue Acquired")
go func() {
start := time.Now()
// deadline prevents bulker being blocked on flush
flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
defer cancel()
if b.tracer != nil {
trans := b.tracer.StartTransaction(fmt.Sprintf("Flush queue %s", queue.Type()), "bulker")
trans.Context.SetLabel("queue.size", queue.cnt)
trans.Context.SetLabel("queue.pending", queue.pending)
ctx = apm.ContextWithTransaction(ctx, trans)
defer trans.End()
}
defer w.Release(1)
var err error
switch queue.ty {
case kQueueRead, kQueueRefreshRead:
err = b.flushRead(flushCtx, queue)
case kQueueSearch, kQueueFleetSearch:
err = b.flushSearch(flushCtx, queue)
case kQueueAPIKeyUpdate:
err = b.flushUpdateAPIKey(flushCtx, queue)
default:
err = b.flushBulk(flushCtx, queue)
}
if err != nil {
failQueue(queue, err)
apm.CaptureError(ctx, err).Send()
}
zerolog.Ctx(ctx).Trace().
Err(err).
Str("mod", kModBulk).
Int("cnt", queue.cnt).
Int("szPending", queue.pending).
Str("queue", queue.Type()).
Dur("rtt", time.Since(start)).
Msg("flushQueue Done")
}()
return nil
}