func()

in internal/pkg/bulk/opBulk.go [168:294]


func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error {
	start := time.Now()

	const kRoughEstimatePerItem = 200

	bufSz := queue.cnt * kRoughEstimatePerItem
	if bufSz < queue.pending {
		bufSz = queue.pending
	}

	var buf bytes.Buffer
	buf.Grow(bufSz)

	queueCnt := 0
	links := []apm.SpanLink{}
	for n := queue.head; n != nil; n = n.next {
		buf.Write(n.buf.Bytes())
		queueCnt += 1
		if n.spanLink != nil {
			links = append(links, *n.spanLink)
		}
	}

	// We should not encounter a case outside of testing where blk instances have no links
	// but just in case, set to nil to preserve default behavior
	if len(links) == 0 {
		links = nil
	}
	span, ctx := apm.StartSpanOptions(ctx, fmt.Sprintf("Flush: %s", queue.Type()), queue.Type(), apm.SpanOptions{
		Links: links,
	})
	defer span.End()

	// Do actual bulk request; defer to the client
	req := esapi.BulkRequest{
		Body: bytes.NewReader(buf.Bytes()),
	}

	if queue.ty == kQueueRefreshBulk {
		req.Refresh = "true"
	}

	res, err := req.Do(ctx, b.es)
	if err != nil {
		zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do")
		return err
	}

	if res.Body != nil {
		defer res.Body.Close()
	}

	if res.IsError() {
		zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result")
		return parseError(res, zerolog.Ctx(ctx))
	}

	// Reuse buffer
	buf.Reset()

	bodySz, err := buf.ReadFrom(res.Body)
	if err != nil {
		zerolog.Ctx(ctx).Error().
			Err(err).
			Str("mod", kModBulk).
			Msg("Response error")
		return err
	}

	var blk bulkIndexerResponse
	blk.Items = make([]bulkStubItem, 0, queueCnt)

	// TODO: We're loosing information abut the errors, we should check a way
	// to return the full error ES returns
	if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil {
		zerolog.Ctx(ctx).Error().Err(err).
			Str("mod", kModBulk).
			Msg("flushBulk failed, could not unmarshal ES response")
		return fmt.Errorf("flushBulk failed, could not unmarshal ES response: %w", err)
	}
	if blk.HasErrors {
		// We lack information to properly correlate this error with what has failed.
		// Thus, for now it'd be more noise than information outside an investigation.
		zerolog.Ctx(ctx).Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error")
	}

	zerolog.Ctx(ctx).Trace().
		Err(err).
		Bool("refresh", queue.ty == kQueueRefreshBulk).
		Str("mod", kModBulk).
		Int("took", blk.Took).
		Dur("rtt", time.Since(start)).
		Bool("hasErrors", blk.HasErrors).
		Int("cnt", len(blk.Items)).
		Int("bufSz", bufSz).
		Int64("bodySz", bodySz).
		Msg("flushBulk")

	if len(blk.Items) != queueCnt {
		return fmt.Errorf("Bulk queue length mismatch")
	}

	// WARNING: Once we start pushing items to
	// the queue, the node pointers are invalid.
	// Do NOT return a non-nil value or failQueue
	// up the stack will fail.

	n := queue.head
	for i := range blk.Items {
		next := n.next // 'n' is invalid immediately on channel send

		item := blk.Items[i].Choose()
		select {
		case n.ch <- respT{
			err:  item.deriveError(),
			idx:  n.idx,
			data: item,
		}:
		default:
			panic("Unexpected blocked response channel on flushBulk")
		}

		n = next
	}

	return nil
}