in dth/job.go [723:781]
func (w *Worker) startMultipartUpload(ctx context.Context, obj *Object, destKey, uploadID *string, parts map[int](*Part), transferCh chan struct{}) ([]*Part, error) {
totalParts, chunkSize := w.getTotalParts(obj.Size)
// log.Printf("Total parts are %d for %s\n", totalParts, obj.Key)
var wg sync.WaitGroup
partCh := make(chan *Part, totalParts)
partErrorCh := make(chan error, totalParts) // Capture Errors
for i := 0; i < totalParts; i++ {
partNumber := i + 1
if part, found := parts[partNumber]; found {
// log.Printf("Part %d found with etag %s, no need to transfer again\n", partNumber, *part.etag)
// Simply put the part info to the channel
partCh <- part
} else {
// If not, upload the part
wg.Add(1)
transferCh <- struct{}{}
go func(i int) {
defer wg.Done()
result := w.transfer(ctx, obj, destKey, int64(i*chunkSize), int64(chunkSize), uploadID, partNumber, nil)
if result.err != nil {
partErrorCh <- result.err
} else {
part := &Part{
partNumber: i + 1,
etag: result.etag,
}
partCh <- part
}
<-transferCh
}(i)
}
}
wg.Wait()
close(partErrorCh)
close(partCh)
for err := range partErrorCh {
// returned when at least 1 error
return nil, err
}
allParts := make([]*Part, totalParts)
for i := 0; i < totalParts; i++ {
// The list of parts must be in ascending order
p := <-partCh
allParts[p.partNumber-1] = p
}
return allParts, nil
}