in dth/job.go [784:838]
func (w *Worker) transfer(ctx context.Context, obj *Object, destKey *string, start, chunkSize int64, uploadID *string, partNumber int, meta *Metadata) (result *TransferResult) {
var etag *string
var err error
if start+chunkSize > obj.Size {
chunkSize = obj.Size - start
}
log.Printf("----->Downloading %d Bytes from %s/%s\n", chunkSize, w.cfg.SrcBucket, obj.Key)
body, err := w.srcClient.GetObject(ctx, &obj.Key, obj.Size, start, chunkSize, "null")
if err != nil {
var ae *types.NoSuchKey
if errors.As(err, &ae) {
log.Printf("No such key %s, the object might be deleted. Cancelling...", obj.Key)
return &TransferResult{
status: "CANCEL",
err: err,
}
}
// status = "ERROR"
return &TransferResult{
status: "ERROR",
err: err,
}
}
// destKey := appendPrefix(&obj.Key, &w.cfg.DestPrefix)
// Use PutObject for single object upload
// Use UploadPart for multipart upload
if uploadID != nil {
log.Printf("----->Uploading %d Bytes to %s/%s - Part %d\n", chunkSize, w.cfg.DestBucket, *destKey, partNumber)
etag, err = w.desClient.UploadPart(ctx, destKey, body, uploadID, partNumber)
} else {
log.Printf("----->Uploading %d Bytes to %s/%s\n", chunkSize, w.cfg.DestBucket, *destKey)
etag, err = w.desClient.PutObject(ctx, destKey, body, &w.cfg.DestStorageClass, &w.cfg.DestAcl, meta)
}
body = nil // release memory
if err != nil {
return &TransferResult{
status: "ERROR",
err: err,
}
}
log.Printf("----->Completed %d Bytes from %s/%s to %s/%s\n", chunkSize, w.cfg.SrcBucket, obj.Key, w.cfg.DestBucket, *destKey)
return &TransferResult{
status: "DONE",
etag: etag,
}
}