in dth/job.go [650:704]
func (w *Worker) migrateBigFile(ctx context.Context, obj *Object, destKey *string, transferCh chan struct{}) *TransferResult {
var err error
var parts map[int]*Part
uploadID := w.desClient.GetUploadID(ctx, destKey)
// If uploadID Found, use list parts to get all existing parts.
// Else Create a new upload ID
if uploadID != nil {
// log.Printf("Found upload ID %s", *uploadID)
parts = w.desClient.ListParts(ctx, destKey, uploadID)
} else {
// Add metadata to CreateMultipartUpload func.
var meta *Metadata
if w.cfg.IncludeMetadata {
meta = w.srcClient.HeadObject(ctx, &obj.Key)
}
uploadID, err = w.desClient.CreateMultipartUpload(ctx, destKey, &w.cfg.DestStorageClass, &w.cfg.DestAcl, meta)
if err != nil {
log.Printf("Failed to create upload ID - %s for %s\n", err.Error(), *destKey)
return &TransferResult{
status: "ERROR",
err: err,
}
}
}
allParts, err := w.startMultipartUpload(ctx, obj, destKey, uploadID, parts, transferCh)
if err != nil {
return &TransferResult{
status: "ERROR",
err: err,
}
}
etag, err := w.desClient.CompleteMultipartUpload(ctx, destKey, uploadID, allParts)
if err != nil {
log.Printf("Failed to complete upload for %s - %s\n", obj.Key, err.Error())
w.desClient.AbortMultipartUpload(ctx, destKey, uploadID)
return &TransferResult{
status: "ERROR",
err: err,
}
}
// log.Printf("Completed the transfer of %s with etag %s\n", obj.Key, *etag)
return &TransferResult{
status: "DONE",
etag: etag,
err: nil,
}
}