func()

in origin/blobserver/server.go [824:912]


func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
	ctx, span := s.tracer.Start(r.Context(), "origin.commit_upload",
		trace.WithSpanKind(trace.SpanKindServer),
		trace.WithAttributes(
			attribute.String("component", "origin"),
			attribute.String("operation", "commit_cluster_upload"),
		),
	)
	defer span.End()

	d, err := httputil.ParseDigest(r, "digest")
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "parse digest failed")
		return err
	}
	namespace, err := httputil.ParseParam(r, "namespace")
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "parse namespace failed")
		return err
	}
	uid, err := httputil.ParseParam(r, "uid")
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "parse uid failed")
		return err
	}

	span.SetAttributes(
		attribute.String("namespace", namespace),
		attribute.String("blob.digest", d.Hex()),
		attribute.String("upload.uid", uid),
	)

	log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid).Info("Committing cluster upload")
	if err := s.uploader.commit(d, uid); err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "commit upload failed")
		log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid).Errorf("Failed to commit cluster upload: %s", err)
		return s.handleUploadConflict(ctx, err, namespace, d)
	}

	if err := s.writeBack(ctx, namespace, d, 0); err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "writeback initiation failed")
		log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()).Errorf("Failed to write back blob: %s", err)
		return err
	}

	// Get blob size for replication logging
	fi, err := s.cas.GetCacheFileStat(d.Hex())
	var blobSize int64
	if err == nil {
		blobSize = fi.Size()
	}
	span.SetAttributes(attribute.Int64("blob.size_bytes", blobSize))

	replicateStart := time.Now()
	log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "size_bytes", blobSize).Debug("Replicating upload to other origins")
	err = s.applyToReplicas(d, func(i int, client blobclient.Client) error {
		replicaStart := time.Now()
		delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1)
		f, err := s.cas.GetCacheFileReader(d.Hex())
		if err != nil {
			return fmt.Errorf("get cache file: %s", err)
		}
		if err := client.DuplicateUploadBlob(namespace, d, f, delay); err != nil {
			duration := time.Since(replicaStart)
			log.With("namespace", namespace, "digest", d.Hex(), "replica", client.Addr(), "size_bytes", blobSize, "duration_s", duration.Seconds()).Errorf("Failed to duplicate upload: %s", err)
			return fmt.Errorf("duplicate upload: %s", err)
		}
		duration := time.Since(replicaStart)
		log.With("namespace", namespace, "digest", d.Hex(), "replica", client.Addr(), "size_bytes", blobSize, "duration_s", duration.Seconds()).Debug("Successfully duplicated upload")
		return nil
	})
	replicateDuration := time.Since(replicateStart)
	span.SetAttributes(attribute.Int64("replication.duration_ms", replicateDuration.Milliseconds()))

	if err != nil {
		span.SetAttributes(attribute.String("replication.error", err.Error()))
		log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "replication_duration_ms", replicateDuration.Milliseconds()).Errorf("Error duplicating write-back task to replicas: %s", err)
		// Don't fail the commit if replication fails - blob is still uploaded
	}

	log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()).Info("Successfully committed cluster upload")
	span.SetStatus(codes.Ok, "upload committed and replicated")
	return nil
}