func()

in origin/blobclient/cluster_client.go [132:192]


func (c *clusterClient) UploadBlob(ctx context.Context, namespace string, d core.Digest, blob io.ReadSeeker) (err error) {
	ctx, span := otel.Tracer("kraken-origin-cluster-client").Start(ctx, "cluster.upload_blob",
		trace.WithSpanKind(trace.SpanKindClient),
		trace.WithAttributes(
			attribute.String("component", "origin-cluster-client"),
			attribute.String("operation", "upload_blob"),
			attribute.String("namespace", namespace),
			attribute.String("blob.digest", d.Hex()),
		),
	)
	defer span.End()

	clients, err := c.resolver.Resolve(d)
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "failed to resolve clients")
		return fmt.Errorf("resolve clients: %s", err)
	}

	span.SetAttributes(attribute.Int("cluster.origin_count", len(clients)))
	log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()).Debug("Starting blob upload to origin cluster")

	// We prefer the origin with highest hashing score so the first origin will handle
	// replication to origins with lower score. This is because we want to reduce upload
	// conflicts between local replicas.
	for i, client := range clients {
		originAddr := client.Addr()
		span.SetAttributes(attribute.Int("cluster.attempt", i))

		log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr, "attempt", i).Debug("Attempting blob upload to origin")
		err = client.UploadBlob(ctx, namespace, d, blob)
		if err == nil {
			log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr).Debug("Blob upload succeeded")
			span.SetAttributes(attribute.String("cluster.successful_origin", originAddr))
			span.SetStatus(codes.Ok, "upload succeeded")
			return nil
		}
		log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr, "error", err).Error("Blob upload failed")

		// Non-retryable error - don't try other origins
		if !httputil.IsNetworkError(err) && !httputil.IsRetryable(err) {
			span.RecordError(err)
			span.SetStatus(codes.Error, "non-retryable error")
			return err
		}

		// Allow retry on another origin if the current upstream is temporarily
		// unavailable or under high load.
		log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "origin", originAddr, "attempt", i).Debug("Rewinding blob reader for retry")
		if _, seekErr := blob.Seek(0, io.SeekStart); seekErr != nil {
			log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "error", seekErr).Error("Failed to rewind blob reader for retry")
			span.RecordError(seekErr)
			span.SetStatus(codes.Error, "failed to rewind blob")
			return fmt.Errorf("rewind blob for retry after %d attempts: %w", i, seekErr)
		}
	}

	span.RecordError(err)
	span.SetStatus(codes.Error, "all origins failed")
	return err
}