in origin/blobserver/server.go [824:912]
func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
ctx, span := s.tracer.Start(r.Context(), "origin.commit_upload",
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
attribute.String("component", "origin"),
attribute.String("operation", "commit_cluster_upload"),
),
)
defer span.End()
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse digest failed")
return err
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse namespace failed")
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse uid failed")
return err
}
span.SetAttributes(
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
attribute.String("upload.uid", uid),
)
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid).Info("Committing cluster upload")
if err := s.uploader.commit(d, uid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "commit upload failed")
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid).Errorf("Failed to commit cluster upload: %s", err)
return s.handleUploadConflict(ctx, err, namespace, d)
}
if err := s.writeBack(ctx, namespace, d, 0); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "writeback initiation failed")
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()).Errorf("Failed to write back blob: %s", err)
return err
}
// Get blob size for replication logging
fi, err := s.cas.GetCacheFileStat(d.Hex())
var blobSize int64
if err == nil {
blobSize = fi.Size()
}
span.SetAttributes(attribute.Int64("blob.size_bytes", blobSize))
replicateStart := time.Now()
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "size_bytes", blobSize).Debug("Replicating upload to other origins")
err = s.applyToReplicas(d, func(i int, client blobclient.Client) error {
replicaStart := time.Now()
delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1)
f, err := s.cas.GetCacheFileReader(d.Hex())
if err != nil {
return fmt.Errorf("get cache file: %s", err)
}
if err := client.DuplicateUploadBlob(namespace, d, f, delay); err != nil {
duration := time.Since(replicaStart)
log.With("namespace", namespace, "digest", d.Hex(), "replica", client.Addr(), "size_bytes", blobSize, "duration_s", duration.Seconds()).Errorf("Failed to duplicate upload: %s", err)
return fmt.Errorf("duplicate upload: %s", err)
}
duration := time.Since(replicaStart)
log.With("namespace", namespace, "digest", d.Hex(), "replica", client.Addr(), "size_bytes", blobSize, "duration_s", duration.Seconds()).Debug("Successfully duplicated upload")
return nil
})
replicateDuration := time.Since(replicateStart)
span.SetAttributes(attribute.Int64("replication.duration_ms", replicateDuration.Milliseconds()))
if err != nil {
span.SetAttributes(attribute.String("replication.error", err.Error()))
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "replication_duration_ms", replicateDuration.Milliseconds()).Errorf("Error duplicating write-back task to replicas: %s", err)
// Don't fail the commit if replication fails - blob is still uploaded
}
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()).Info("Successfully committed cluster upload")
span.SetStatus(codes.Ok, "upload committed and replicated")
return nil
}