in appender.go [278:472]
func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
n := bulkIndexer.Items()
if n == 0 {
return nil
}
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)
logger := a.config.Logger
var span trace.Span
if a.otelTracingEnabled() {
ctx, span = a.tracer.Start(ctx, "docappender.flush", trace.WithAttributes(
attribute.Int("documents", n),
))
defer span.End()
// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(
zap.String("traceId", span.SpanContext().TraceID().String()),
zap.String("spanId", span.SpanContext().SpanID().String()),
)
}
var flushCtx context.Context
if a.config.FlushTimeout != 0 {
var flushCancel context.CancelFunc
flushCtx, flushCancel = context.WithTimeout(ctx, a.config.FlushTimeout)
defer flushCancel()
} else {
flushCtx = ctx
}
resp, err := bulkIndexer.Flush(flushCtx)
// Record the BulkIndexer buffer's length as the bytesTotal metric after
// the request has been flushed.
if flushed := bulkIndexer.BytesFlushed(); flushed > 0 {
a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal)
}
// Record the BulkIndexer uncompressed bytes written to the buffer
// as the bytesUncompressedTotal metric after the request has been flushed.
if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 {
a.addCount(int64(flushed), &a.bytesUncompressedTotal, a.metrics.bytesUncompressedTotal)
}
if err != nil {
a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive)
atomic.AddInt64(&a.docsFailed, int64(n))
logger.Error("bulk indexing request failed", zap.Error(err))
if a.otelTracingEnabled() && span.IsRecording() {
span.RecordError(err)
span.SetStatus(codes.Error, "bulk indexing request failed")
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
a.addCount(int64(n), nil,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "Timeout")),
)
}
// Bulk indexing may fail with different status codes.
var errFailed ErrorFlushFailed
if errors.As(err, &errFailed) {
var legacy *int64
var status string
switch {
case errFailed.tooMany:
legacy, status = &a.tooManyRequests, "TooMany"
case errFailed.clientError:
legacy, status = &a.docsFailedClient, "FailedClient"
case errFailed.serverError:
legacy, status = &a.docsFailedServer, "FailedServer"
}
if status != "" {
a.addCount(int64(n), legacy, a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", status), semconv.HTTPResponseStatusCode(errFailed.statusCode)),
)
}
}
return err
}
var docsFailed, docsIndexed,
// breakdown of failed docs:
tooManyRequests, // failed after document retries (if it applies) and final status is 429
clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429
serverFailed int64 // failed after document retries (if it applies) and final status is 500s
failureStoreDocs := resp.FailureStoreDocs
docsIndexed = resp.Indexed
var failedCount map[BulkIndexerResponseItem]int
if len(resp.FailedDocs) > 0 {
failedCount = make(map[BulkIndexerResponseItem]int, len(resp.FailedDocs))
}
docsFailed = int64(len(resp.FailedDocs))
totalFlushed := docsFailed + docsIndexed
a.addUpDownCount(-totalFlushed, &a.docsActive, a.metrics.docsActive)
for _, info := range resp.FailedDocs {
if info.Status >= 400 && info.Status < 500 {
if info.Status == http.StatusTooManyRequests {
tooManyRequests++
} else {
clientFailed++
}
}
if info.Status >= 500 {
serverFailed++
}
info.Position = 0 // reset position so that the response item can be used as key in the map
failedCount[info]++
if a.otelTracingEnabled() && span.IsRecording() {
e := errors.New(info.Error.Reason)
span.RecordError(e)
span.SetStatus(codes.Error, e.Error())
}
}
for key, count := range failedCount {
logger.Error(fmt.Sprintf("failed to index documents in '%s' (%s): %s",
key.Index, key.Error.Type, key.Error.Reason,
), zap.Int("documents", count))
}
if docsFailed > 0 {
atomic.AddInt64(&a.docsFailed, docsFailed)
}
if resp.RetriedDocs > 0 {
// docs are scheduled to be retried but not yet failed due to retry limit
a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried,
metric.WithAttributes(attribute.Int("greatest_retry", resp.GreatestRetry)),
)
}
if docsIndexed > 0 {
a.addCount(docsIndexed, &a.docsIndexed,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "Success")),
)
}
if tooManyRequests > 0 {
a.addCount(tooManyRequests, &a.tooManyRequests,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "TooMany")),
)
}
if clientFailed > 0 {
a.addCount(clientFailed, &a.docsFailedClient,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "FailedClient")),
)
}
if serverFailed > 0 {
a.addCount(serverFailed, &a.docsFailedServer,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "FailedServer")),
)
}
if failureStoreDocs.Used > 0 {
a.addCount(failureStoreDocs.Used, nil,
a.metrics.docsIndexed,
metric.WithAttributes(
attribute.String("status", "FailureStore"),
attribute.String("failure_store", string(FailureStoreStatusUsed)),
),
)
}
if failureStoreDocs.Failed > 0 {
a.addCount(failureStoreDocs.Failed, nil,
a.metrics.docsIndexed,
metric.WithAttributes(
attribute.String("status", "FailureStore"),
attribute.String("failure_store", string(FailureStoreStatusFailed)),
),
)
}
if failureStoreDocs.NotEnabled > 0 {
a.addCount(failureStoreDocs.NotEnabled, nil,
a.metrics.docsIndexed,
metric.WithAttributes(
attribute.String("status", "FailureStore"),
attribute.String("failure_store", string(FailureStoreStatusNotEnabled)),
),
)
}
logger.Debug(
"bulk request completed",
zap.Int64("docs_indexed", docsIndexed),
zap.Int64("docs_failed", docsFailed),
zap.Int64("docs_rate_limited", tooManyRequests),
zap.Int64("docs_failure_store_used", failureStoreDocs.Used),
zap.Int64("docs_failure_store_failed", failureStoreDocs.Failed),
zap.Int64("docs_failure_store_not_enabled", failureStoreDocs.NotEnabled),
)
if a.otelTracingEnabled() && span.IsRecording() {
span.SetStatus(codes.Ok, "")
}
return nil
}