func()

in appender.go [278:472]


func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
	n := bulkIndexer.Items()
	if n == 0 {
		return nil
	}
	defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

	logger := a.config.Logger
	var span trace.Span
	if a.otelTracingEnabled() {
		ctx, span = a.tracer.Start(ctx, "docappender.flush", trace.WithAttributes(
			attribute.Int("documents", n),
		))
		defer span.End()

		// Add trace IDs to logger, to associate any per-item errors
		// below with the trace.
		logger = logger.With(
			zap.String("traceId", span.SpanContext().TraceID().String()),
			zap.String("spanId", span.SpanContext().SpanID().String()),
		)
	}

	var flushCtx context.Context

	if a.config.FlushTimeout != 0 {
		var flushCancel context.CancelFunc
		flushCtx, flushCancel = context.WithTimeout(ctx, a.config.FlushTimeout)
		defer flushCancel()
	} else {
		flushCtx = ctx
	}

	resp, err := bulkIndexer.Flush(flushCtx)

	// Record the BulkIndexer buffer's length as the bytesTotal metric after
	// the request has been flushed.
	if flushed := bulkIndexer.BytesFlushed(); flushed > 0 {
		a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal)
	}
	// Record the BulkIndexer uncompressed bytes written to the buffer
	// as the bytesUncompressedTotal metric after the request has been flushed.
	if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 {
		a.addCount(int64(flushed), &a.bytesUncompressedTotal, a.metrics.bytesUncompressedTotal)
	}
	if err != nil {
		a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive)
		atomic.AddInt64(&a.docsFailed, int64(n))
		logger.Error("bulk indexing request failed", zap.Error(err))
		if a.otelTracingEnabled() && span.IsRecording() {
			span.RecordError(err)
			span.SetStatus(codes.Error, "bulk indexing request failed")
		}

		if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
			a.addCount(int64(n), nil,
				a.metrics.docsIndexed,
				metric.WithAttributes(attribute.String("status", "Timeout")),
			)
		}

		// Bulk indexing may fail with different status codes.
		var errFailed ErrorFlushFailed
		if errors.As(err, &errFailed) {
			var legacy *int64
			var status string
			switch {
			case errFailed.tooMany:
				legacy, status = &a.tooManyRequests, "TooMany"
			case errFailed.clientError:
				legacy, status = &a.docsFailedClient, "FailedClient"
			case errFailed.serverError:
				legacy, status = &a.docsFailedServer, "FailedServer"
			}
			if status != "" {
				a.addCount(int64(n), legacy, a.metrics.docsIndexed,
					metric.WithAttributes(attribute.String("status", status), semconv.HTTPResponseStatusCode(errFailed.statusCode)),
				)
			}
		}
		return err
	}
	var docsFailed, docsIndexed,
		// breakdown of failed docs:
		tooManyRequests, // failed after document retries (if it applies) and final status is 429
		clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429
		serverFailed int64 // failed after document retries (if it applies) and final status is 500s

	failureStoreDocs := resp.FailureStoreDocs
	docsIndexed = resp.Indexed
	var failedCount map[BulkIndexerResponseItem]int
	if len(resp.FailedDocs) > 0 {
		failedCount = make(map[BulkIndexerResponseItem]int, len(resp.FailedDocs))
	}
	docsFailed = int64(len(resp.FailedDocs))
	totalFlushed := docsFailed + docsIndexed
	a.addUpDownCount(-totalFlushed, &a.docsActive, a.metrics.docsActive)
	for _, info := range resp.FailedDocs {
		if info.Status >= 400 && info.Status < 500 {
			if info.Status == http.StatusTooManyRequests {
				tooManyRequests++
			} else {
				clientFailed++
			}
		}
		if info.Status >= 500 {
			serverFailed++
		}
		info.Position = 0 // reset position so that the response item can be used as key in the map
		failedCount[info]++
		if a.otelTracingEnabled() && span.IsRecording() {
			e := errors.New(info.Error.Reason)
			span.RecordError(e)
			span.SetStatus(codes.Error, e.Error())
		}
	}
	for key, count := range failedCount {
		logger.Error(fmt.Sprintf("failed to index documents in '%s' (%s): %s",
			key.Index, key.Error.Type, key.Error.Reason,
		), zap.Int("documents", count))
	}
	if docsFailed > 0 {
		atomic.AddInt64(&a.docsFailed, docsFailed)
	}
	if resp.RetriedDocs > 0 {
		// docs are scheduled to be retried but not yet failed due to retry limit
		a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried,
			metric.WithAttributes(attribute.Int("greatest_retry", resp.GreatestRetry)),
		)
	}
	if docsIndexed > 0 {
		a.addCount(docsIndexed, &a.docsIndexed,
			a.metrics.docsIndexed,
			metric.WithAttributes(attribute.String("status", "Success")),
		)
	}
	if tooManyRequests > 0 {
		a.addCount(tooManyRequests, &a.tooManyRequests,
			a.metrics.docsIndexed,
			metric.WithAttributes(attribute.String("status", "TooMany")),
		)
	}
	if clientFailed > 0 {
		a.addCount(clientFailed, &a.docsFailedClient,
			a.metrics.docsIndexed,
			metric.WithAttributes(attribute.String("status", "FailedClient")),
		)
	}
	if serverFailed > 0 {
		a.addCount(serverFailed, &a.docsFailedServer,
			a.metrics.docsIndexed,
			metric.WithAttributes(attribute.String("status", "FailedServer")),
		)
	}
	if failureStoreDocs.Used > 0 {
		a.addCount(failureStoreDocs.Used, nil,
			a.metrics.docsIndexed,
			metric.WithAttributes(
				attribute.String("status", "FailureStore"),
				attribute.String("failure_store", string(FailureStoreStatusUsed)),
			),
		)
	}
	if failureStoreDocs.Failed > 0 {
		a.addCount(failureStoreDocs.Failed, nil,
			a.metrics.docsIndexed,
			metric.WithAttributes(
				attribute.String("status", "FailureStore"),
				attribute.String("failure_store", string(FailureStoreStatusFailed)),
			),
		)
	}
	if failureStoreDocs.NotEnabled > 0 {
		a.addCount(failureStoreDocs.NotEnabled, nil,
			a.metrics.docsIndexed,
			metric.WithAttributes(
				attribute.String("status", "FailureStore"),
				attribute.String("failure_store", string(FailureStoreStatusNotEnabled)),
			),
		)
	}
	logger.Debug(
		"bulk request completed",
		zap.Int64("docs_indexed", docsIndexed),
		zap.Int64("docs_failed", docsFailed),
		zap.Int64("docs_rate_limited", tooManyRequests),
		zap.Int64("docs_failure_store_used", failureStoreDocs.Used),
		zap.Int64("docs_failure_store_failed", failureStoreDocs.Failed),
		zap.Int64("docs_failure_store_not_enabled", failureStoreDocs.NotEnabled),
	)
	if a.otelTracingEnabled() && span.IsRecording() {
		span.SetStatus(codes.Ok, "")
	}
	return nil
}