in origin/blobclient/cluster_client.go [132:192]
func (c *clusterClient) UploadBlob(ctx context.Context, namespace string, d core.Digest, blob io.ReadSeeker) (err error) {
ctx, span := otel.Tracer("kraken-origin-cluster-client").Start(ctx, "cluster.upload_blob",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("component", "origin-cluster-client"),
attribute.String("operation", "upload_blob"),
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()
clients, err := c.resolver.Resolve(d)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to resolve clients")
return fmt.Errorf("resolve clients: %s", err)
}
span.SetAttributes(attribute.Int("cluster.origin_count", len(clients)))
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()).Debug("Starting blob upload to origin cluster")
// We prefer the origin with highest hashing score so the first origin will handle
// replication to origins with lower score. This is because we want to reduce upload
// conflicts between local replicas.
for i, client := range clients {
originAddr := client.Addr()
span.SetAttributes(attribute.Int("cluster.attempt", i))
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr, "attempt", i).Debug("Attempting blob upload to origin")
err = client.UploadBlob(ctx, namespace, d, blob)
if err == nil {
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr).Debug("Blob upload succeeded")
span.SetAttributes(attribute.String("cluster.successful_origin", originAddr))
span.SetStatus(codes.Ok, "upload succeeded")
return nil
}
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr, "error", err).Error("Blob upload failed")
// Non-retryable error - don't try other origins
if !httputil.IsNetworkError(err) && !httputil.IsRetryable(err) {
span.RecordError(err)
span.SetStatus(codes.Error, "non-retryable error")
return err
}
// Allow retry on another origin if the current upstream is temporarily
// unavailable or under high load.
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr, "attempt", i).Debug("Rewinding blob reader for retry")
if _, seekErr := blob.Seek(0, io.SeekStart); seekErr != nil {
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "error", seekErr).Error("Failed to rewind blob reader for retry")
span.RecordError(seekErr)
span.SetStatus(codes.Error, "failed to rewind blob")
return fmt.Errorf("rewind blob for retry after %d attempts: %w", i, seekErr)
}
}
span.RecordError(err)
span.SetStatus(codes.Error, "all origins failed")
return err
}