in origin/blobserver/server.go [764:819]
func (s *Server) patchClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
ctx, span := s.tracer.Start(r.Context(), "origin.patch_upload",
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
attribute.String("component", "origin"),
attribute.String("operation", "patch_cluster_upload"),
),
)
defer span.End()
d, err := httputil.ParseDigest(r, "digest")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse digest failed")
return err
}
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse namespace failed")
return err
}
uid, err := httputil.ParseParam(r, "uid")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse uid failed")
return err
}
start, end, err := parseContentRange(r.Header)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "parse content range failed")
return err
}
chunkSize := end - start
span.SetAttributes(
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
attribute.String("upload.uid", uid),
attribute.Int64("upload.chunk_start", start),
attribute.Int64("upload.chunk_end", end),
attribute.Int64("upload.chunk_size", chunkSize),
)
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid, "start", start, "end", end).Debug("Patching cluster upload chunk")
if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "patch upload failed")
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid).Errorf("Failed to patch cluster upload: %s", err)
return s.handleUploadConflict(ctx, err, namespace, d)
}
log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex(), "uid", uid, "start", start, "end", end).Debug("Successfully patched upload chunk")
span.SetStatus(codes.Ok, "chunk uploaded")
return nil
}