in registry/storage/driver/s3-aws/v1/s3.go [1161:1303]
func (w *writer) Write(p []byte) (int, error) {
ctx := context.Background()
switch {
case w.closed:
return 0, storagedriver.ErrAlreadyClosed
case w.committed:
return 0, storagedriver.ErrAlreadyCommited
case w.canceled:
return 0, storagedriver.ErrAlreadyCanceled
}
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload
if len(w.parts) > 0 && *w.parts[len(w.parts)-1].Size < common.MinChunkSize {
var completedUploadedParts completedParts
for _, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
sort.Sort(completedUploadedParts)
_, err := w.driver.S3.CompleteMultipartUploadWithContext(
ctx,
&s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedUploadedParts,
},
})
if err != nil {
_, errIn := w.driver.S3.AbortMultipartUploadWithContext(
ctx,
&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
if errIn != nil {
return 0, fmt.Errorf("aborting upload failed while handling error %w: %w", err, errIn)
}
return 0, err
}
resp, err := w.driver.S3.CreateMultipartUploadWithContext(
ctx,
&s3.CreateMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
ContentType: w.driver.getContentType(),
ACL: w.driver.getACL(),
ServerSideEncryption: w.driver.getEncryptionMode(),
StorageClass: w.driver.getStorageClass(),
})
if err != nil {
return 0, err
}
w.uploadID = *resp.UploadId
// If the entire written file is smaller than minChunkSize, we need to make
// a new part from scratch
if w.size < common.MinChunkSize {
resp, err := w.driver.S3.GetObjectWithContext(
ctx,
&s3.GetObjectInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
})
if err != nil {
return 0, err
}
defer resp.Body.Close()
w.parts = nil
w.readyPart, err = io.ReadAll(resp.Body)
if err != nil {
return 0, err
}
} else {
// Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(
ctx,
&s3.UploadPartCopyInput{
Bucket: aws.String(w.driver.Bucket),
CopySource: aws.String(w.driver.Bucket + "/" + w.key),
Key: aws.String(w.key),
PartNumber: aws.Int64(1),
UploadId: resp.UploadId,
})
if err != nil {
return 0, err
}
w.parts = []*s3.Part{
{
ETag: copyPartResp.CopyPartResult.ETag,
PartNumber: aws.Int64(1),
Size: aws.Int64(w.size),
},
}
}
}
var n int64
for len(p) > 0 {
// If no parts are ready to write, fill up the first part
if neededBytes := w.driver.ChunkSize - int64(len(w.readyPart)); neededBytes > 0 {
if int64(len(p)) >= neededBytes {
w.readyPart = append(w.readyPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
} else {
w.readyPart = append(w.readyPart, p...)
n += int64(len(p))
p = nil
}
}
if neededBytes := w.driver.ChunkSize - int64(len(w.pendingPart)); neededBytes > 0 {
if int64(len(p)) >= neededBytes {
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
err := w.flushPart()
// nolint: revive // max-control-nesting: control flow nesting exceeds 3
if err != nil {
w.size += n
return int(n), err // nolint: gosec // n is never going to be negative
}
} else {
w.pendingPart = append(w.pendingPart, p...)
n += int64(len(p))
p = nil
}
}
}
w.size += n
return int(n), nil // nolint: gosec // n is never going to be negative
}