func()

in oss/lib/cp.go [1633:1691]


func (cc *CopyCommand) uploadFiles(srcURLList []StorageURLer, destURL CloudURL) error {
	if err := destURL.checkObjectPrefix(); err != nil {
		return err
	}

	bucket, err := cc.command.ossBucket(destURL.bucket)
	if err != nil {
		return err
	}

	//adjust oss prefix name
	destURL, err = cc.adjustDestURLForUpload(srcURLList, destURL)
	if err != nil {
		return err
	}

	// producer list files
	// consumer set acl
	chFiles := make(chan fileInfoType, ChannelBuf)
	chError := make(chan error, cc.cpOption.routines)
	chListError := make(chan error, 1)
	go cc.fileStatistic(srcURLList)
	go cc.fileProducer(srcURLList, chFiles, chListError)

	LogInfo("upload files,routin count:%d,multi part size threshold:%d\n",
		cc.cpOption.routines, cc.cpOption.threshold)
	for i := 0; int64(i) < cc.cpOption.routines; i++ {
		go cc.uploadConsumer(bucket, destURL, chFiles, chError)
	}

	completed := 0
	var listError error = nil
	for int64(completed) <= cc.cpOption.routines {
		select {
		case err := <-chListError:
			if err != nil {
				if !cc.cpOption.ctnu {
					return err
				} else {
					listError = err
				}
			}
			completed++
		case err := <-chError:
			if err == nil {
				completed++
			} else {
				if !cc.cpOption.ctnu {
					cc.closeProgress()
					fmt.Printf(cc.monitor.progressBar(true, errExit))
					return err
				}
			}
		}
	}
	cc.closeProgress()
	fmt.Printf(cc.monitor.progressBar(true, normalExit))
	return listError
}