in oss/lib/cp.go [1633:1691]
func (cc *CopyCommand) uploadFiles(srcURLList []StorageURLer, destURL CloudURL) error {
if err := destURL.checkObjectPrefix(); err != nil {
return err
}
bucket, err := cc.command.ossBucket(destURL.bucket)
if err != nil {
return err
}
//adjust oss prefix name
destURL, err = cc.adjustDestURLForUpload(srcURLList, destURL)
if err != nil {
return err
}
// producer list files
// consumer set acl
chFiles := make(chan fileInfoType, ChannelBuf)
chError := make(chan error, cc.cpOption.routines)
chListError := make(chan error, 1)
go cc.fileStatistic(srcURLList)
go cc.fileProducer(srcURLList, chFiles, chListError)
LogInfo("upload files,routin count:%d,multi part size threshold:%d\n",
cc.cpOption.routines, cc.cpOption.threshold)
for i := 0; int64(i) < cc.cpOption.routines; i++ {
go cc.uploadConsumer(bucket, destURL, chFiles, chError)
}
completed := 0
var listError error = nil
for int64(completed) <= cc.cpOption.routines {
select {
case err := <-chListError:
if err != nil {
if !cc.cpOption.ctnu {
return err
} else {
listError = err
}
}
completed++
case err := <-chError:
if err == nil {
completed++
} else {
if !cc.cpOption.ctnu {
cc.closeProgress()
fmt.Printf(cc.monitor.progressBar(true, errExit))
return err
}
}
}
}
cc.closeProgress()
fmt.Printf(cc.monitor.progressBar(true, normalExit))
return listError
}