func()

in internal/pkg/bulk/engine.go [458:529]


func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error {
	start := time.Now()
	zerolog.Ctx(ctx).Trace().
		Str("mod", kModBulk).
		Int("cnt", queue.cnt).
		Int("szPending", queue.pending).
		Str("queue", queue.Type()).
		Msg("flushQueue Wait")

	acquireCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
	defer cancel()

	if err := w.Acquire(acquireCtx, 1); err != nil {
		zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error")
		return err
	}

	zerolog.Ctx(ctx).Trace().
		Str("mod", kModBulk).
		Int("cnt", queue.cnt).
		Dur("tdiff", time.Since(start)).
		Int("szPending", queue.pending).
		Str("queue", queue.Type()).
		Msg("flushQueue Acquired")

	go func() {
		start := time.Now()

		// deadline prevents bulker being blocked on flush
		flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
		defer cancel()

		if b.tracer != nil {
			trans := b.tracer.StartTransaction(fmt.Sprintf("Flush queue %s", queue.Type()), "bulker")
			trans.Context.SetLabel("queue.size", queue.cnt)
			trans.Context.SetLabel("queue.pending", queue.pending)
			ctx = apm.ContextWithTransaction(ctx, trans)
			defer trans.End()
		}

		defer w.Release(1)

		var err error
		switch queue.ty {
		case kQueueRead, kQueueRefreshRead:
			err = b.flushRead(flushCtx, queue)
		case kQueueSearch, kQueueFleetSearch:
			err = b.flushSearch(flushCtx, queue)
		case kQueueAPIKeyUpdate:
			err = b.flushUpdateAPIKey(flushCtx, queue)
		default:
			err = b.flushBulk(flushCtx, queue)
		}

		if err != nil {
			failQueue(queue, err)
			apm.CaptureError(ctx, err).Send()
		}

		zerolog.Ctx(ctx).Trace().
			Err(err).
			Str("mod", kModBulk).
			Int("cnt", queue.cnt).
			Int("szPending", queue.pending).
			Str("queue", queue.Type()).
			Dur("rtt", time.Since(start)).
			Msg("flushQueue Done")

	}()

	return nil
}