func()

in registry/storage/driver/s3-aws/v1/s3.go [1161:1303]


func (w *writer) Write(p []byte) (int, error) {
	ctx := context.Background()

	switch {
	case w.closed:
		return 0, storagedriver.ErrAlreadyClosed
	case w.committed:
		return 0, storagedriver.ErrAlreadyCommited
	case w.canceled:
		return 0, storagedriver.ErrAlreadyCanceled
	}

	// If the last written part is smaller than minChunkSize, we need to make a
	// new multipart upload
	if len(w.parts) > 0 && *w.parts[len(w.parts)-1].Size < common.MinChunkSize {
		var completedUploadedParts completedParts
		for _, part := range w.parts {
			completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
				ETag:       part.ETag,
				PartNumber: part.PartNumber,
			})
		}

		sort.Sort(completedUploadedParts)

		_, err := w.driver.S3.CompleteMultipartUploadWithContext(
			ctx,
			&s3.CompleteMultipartUploadInput{
				Bucket:   aws.String(w.driver.Bucket),
				Key:      aws.String(w.key),
				UploadId: aws.String(w.uploadID),
				MultipartUpload: &s3.CompletedMultipartUpload{
					Parts: completedUploadedParts,
				},
			})
		if err != nil {
			_, errIn := w.driver.S3.AbortMultipartUploadWithContext(
				ctx,
				&s3.AbortMultipartUploadInput{
					Bucket:   aws.String(w.driver.Bucket),
					Key:      aws.String(w.key),
					UploadId: aws.String(w.uploadID),
				})
			if errIn != nil {
				return 0, fmt.Errorf("aborting upload failed while handling error %w: %w", err, errIn)
			}
			return 0, err
		}

		resp, err := w.driver.S3.CreateMultipartUploadWithContext(
			ctx,
			&s3.CreateMultipartUploadInput{
				Bucket:               aws.String(w.driver.Bucket),
				Key:                  aws.String(w.key),
				ContentType:          w.driver.getContentType(),
				ACL:                  w.driver.getACL(),
				ServerSideEncryption: w.driver.getEncryptionMode(),
				StorageClass:         w.driver.getStorageClass(),
			})
		if err != nil {
			return 0, err
		}
		w.uploadID = *resp.UploadId

		// If the entire written file is smaller than minChunkSize, we need to make
		// a new part from scratch
		if w.size < common.MinChunkSize {
			resp, err := w.driver.S3.GetObjectWithContext(
				ctx,
				&s3.GetObjectInput{
					Bucket: aws.String(w.driver.Bucket),
					Key:    aws.String(w.key),
				})
			if err != nil {
				return 0, err
			}
			defer resp.Body.Close()
			w.parts = nil
			w.readyPart, err = io.ReadAll(resp.Body)
			if err != nil {
				return 0, err
			}
		} else {
			// Otherwise we can use the old file as the new first part
			copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(
				ctx,
				&s3.UploadPartCopyInput{
					Bucket:     aws.String(w.driver.Bucket),
					CopySource: aws.String(w.driver.Bucket + "/" + w.key),
					Key:        aws.String(w.key),
					PartNumber: aws.Int64(1),
					UploadId:   resp.UploadId,
				})
			if err != nil {
				return 0, err
			}
			w.parts = []*s3.Part{
				{
					ETag:       copyPartResp.CopyPartResult.ETag,
					PartNumber: aws.Int64(1),
					Size:       aws.Int64(w.size),
				},
			}
		}
	}

	var n int64

	for len(p) > 0 {
		// If no parts are ready to write, fill up the first part
		if neededBytes := w.driver.ChunkSize - int64(len(w.readyPart)); neededBytes > 0 {
			if int64(len(p)) >= neededBytes {
				w.readyPart = append(w.readyPart, p[:neededBytes]...)
				n += neededBytes
				p = p[neededBytes:]
			} else {
				w.readyPart = append(w.readyPart, p...)
				n += int64(len(p))
				p = nil
			}
		}

		if neededBytes := w.driver.ChunkSize - int64(len(w.pendingPart)); neededBytes > 0 {
			if int64(len(p)) >= neededBytes {
				w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
				n += neededBytes
				p = p[neededBytes:]
				err := w.flushPart()
				// nolint: revive // max-control-nesting: control flow nesting exceeds 3
				if err != nil {
					w.size += n
					return int(n), err // nolint: gosec // n is never going to be negative
				}
			} else {
				w.pendingPart = append(w.pendingPart, p...)
				n += int64(len(p))
				p = nil
			}
		}
	}
	w.size += n
	return int(n), nil // nolint: gosec // n is never going to be negative
}