in oss/lib/cp.go [1339:1537]
func (cc *CopyCommand) RunCommand() error {
cc.cpOption.recursive, _ = GetBool(OptionRecursion, cc.command.options)
cc.cpOption.force, _ = GetBool(OptionForce, cc.command.options)
cc.cpOption.update, _ = GetBool(OptionUpdate, cc.command.options)
cc.cpOption.threshold, _ = GetInt(OptionBigFileThreshold, cc.command.options)
cc.cpOption.cpDir, _ = GetString(OptionCheckpointDir, cc.command.options)
cc.cpOption.routines, _ = GetInt(OptionRoutines, cc.command.options)
cc.cpOption.ctnu = false
if cc.cpOption.recursive {
disableIgnoreError, _ := GetBool(OptionDisableIgnoreError, cc.command.options)
cc.cpOption.ctnu = !disableIgnoreError
}
outputDir, _ := GetString(OptionOutputDir, cc.command.options)
cc.cpOption.snapshotPath, _ = GetString(OptionSnapshotPath, cc.command.options)
cc.cpOption.vrange, _ = GetString(OptionRange, cc.command.options)
cc.cpOption.encodingType, _ = GetString(OptionEncodingType, cc.command.options)
cc.cpOption.meta, _ = GetString(OptionMeta, cc.command.options)
cc.cpOption.tagging, _ = GetString(OptionTagging, cc.command.options)
acl, _ := GetString(OptionACL, cc.command.options)
payer, _ := GetString(OptionRequestPayer, cc.command.options)
cc.cpOption.partitionInfo, _ = GetString(OptionPartitionDownload, cc.command.options)
cc.cpOption.versionId, _ = GetString(OptionVersionId, cc.command.options)
cc.cpOption.enableSymlinkDir, _ = GetBool(OptionEnableSymlinkDir, cc.command.options)
cc.cpOption.onlyCurrentDir, _ = GetBool(OptionOnlyCurrentDir, cc.command.options)
cc.cpOption.disableDirObject, _ = GetBool(OptionDisableDirObject, cc.command.options)
cc.cpOption.disableAllSymlink, _ = GetBool(OptionDisableAllSymlink, cc.command.options)
if cc.cpOption.enableSymlinkDir && cc.cpOption.disableAllSymlink {
return fmt.Errorf("--enable-symlink-dir and --disable-all-symlink can't be both exist")
}
var res bool
res, cc.cpOption.filters = getFilter(os.Args)
if !res {
return fmt.Errorf("--include or --exclude does not support format containing dir info")
}
if !cc.cpOption.recursive && len(cc.cpOption.filters) > 0 {
return fmt.Errorf("--include or --exclude only work with --recursive")
}
for k, v := range cc.cpOption.filters {
LogInfo("filter %d,name:%s,pattern:%s\n", k, v.name, v.pattern)
}
cc.cpOption.startTime, _ = GetInt(OptionStartTime, cc.command.options)
cc.cpOption.endTime, _ = GetInt(OptionEndTime, cc.command.options)
if cc.cpOption.endTime > 0 && cc.cpOption.startTime > cc.cpOption.endTime {
return fmt.Errorf("start time %d is larger than end time %d", cc.cpOption.startTime, cc.cpOption.endTime)
}
//get file list
srcURLList, err := cc.getStorageURLs(cc.command.args[0 : len(cc.command.args)-1])
if err != nil {
return err
}
destURL, err := StorageURLFromString(cc.command.args[len(cc.command.args)-1], cc.cpOption.encodingType)
if err != nil {
return err
}
opType := cc.getCommandType(srcURLList, destURL)
if err := cc.checkCopyArgs(srcURLList, destURL, opType); err != nil {
return err
}
if err := cc.checkCopyOptions(opType); err != nil {
return err
}
cc.cpOption.options = []oss.Option{}
if cc.cpOption.meta != "" {
headers, err := cc.command.parseHeaders(cc.cpOption.meta, false)
if err != nil {
return err
}
topts, err := cc.command.getOSSOptions(headerOptionMap, headers)
if err != nil {
return err
}
cc.cpOption.options = append(cc.cpOption.options, topts...)
}
if cc.cpOption.tagging != "" {
if opType == operationTypeGet {
return fmt.Errorf("No need to set tagging for download")
}
tags, err := cc.command.getOSSTagging(cc.cpOption.tagging)
if err != nil {
return err
}
tagging := oss.Tagging{Tags: tags}
cc.cpOption.options = append(cc.cpOption.options, oss.SetTagging(tagging))
}
if acl != "" {
if opType == operationTypeGet {
return fmt.Errorf("No need to set ACL for download")
}
var opAcl oss.ACLType
if opAcl, err = cc.command.checkACL(acl, objectACL); err != nil {
return err
}
cc.cpOption.options = append(cc.cpOption.options, oss.ObjectACL(opAcl))
}
if cc.cpOption.versionId != "" {
cc.cpOption.options = append(cc.cpOption.options, oss.VersionId(cc.cpOption.versionId))
}
if payer != "" {
if payer != strings.ToLower(string(oss.Requester)) {
return fmt.Errorf("invalid request payer: %s, please check", payer)
}
cc.cpOption.options = append(cc.cpOption.options, oss.RequestPayer(oss.PayerType(payer)))
cc.cpOption.payerOptions = append(cc.cpOption.payerOptions, oss.RequestPayer(oss.PayerType(payer)))
}
// init reporter
if cc.cpOption.reporter, err = GetReporter(cc.cpOption.recursive, outputDir, commandLine); err != nil {
return err
}
// create checkpoint dir
if err := os.MkdirAll(cc.cpOption.cpDir, 0755); err != nil {
//
//fmt.Printf("%s", cc.cpOption.cpDir)
return err
}
// load snapshot
if cc.cpOption.snapshotPath != "" {
if cc.cpOption.snapshotldb, err = leveldb.OpenFile(cc.cpOption.snapshotPath, nil); err != nil {
return fmt.Errorf("load snapshot error, reason: %s", err.Error())
}
defer cc.cpOption.snapshotldb.Close()
}
if cc.cpOption.partitionInfo != "" {
if opType == operationTypeGet {
sliceInfo := strings.Split(cc.cpOption.partitionInfo, ":")
if len(sliceInfo) == 2 {
partitionIndex, err1 := strconv.Atoi(sliceInfo[0])
partitionCount, err2 := strconv.Atoi(sliceInfo[1])
if err1 != nil || err2 != nil {
return fmt.Errorf("parsar OptionPartitionDownload error,value is:%s", cc.cpOption.partitionInfo)
}
if partitionIndex < 1 || partitionCount < 1 || partitionIndex > partitionCount {
return fmt.Errorf("parsar OptionPartitionDownload error,value is:%s", cc.cpOption.partitionInfo)
}
cc.cpOption.partitionIndex = partitionIndex
cc.cpOption.partitionCount = partitionCount
} else {
return fmt.Errorf("parsar OptionPartitionDownload error,value is:%s", cc.cpOption.partitionInfo)
}
} else {
return fmt.Errorf("PutObject or CopyObject doesn't support option OptionPartitionDownload")
}
} else {
cc.cpOption.partitionIndex = 0
cc.cpOption.partitionCount = 0
}
cc.monitor.init(opType)
cc.cpOption.opType = opType
chProgressSignal = make(chan chProgressSignalType, 10)
go cc.progressBar()
startT := time.Now().UnixNano() / 1000 / 1000
switch opType {
case operationTypePut:
LogInfo("begin uploadFiles\n")
err = cc.uploadFiles(srcURLList, destURL.(CloudURL))
case operationTypeGet:
LogInfo("begin downloadFiles\n")
err = cc.downloadFiles(srcURLList[0].(CloudURL), destURL.(FileURL))
default:
LogInfo("begin copyFiles\n")
err = cc.copyFiles(srcURLList[0].(CloudURL), destURL.(CloudURL))
}
endT := time.Now().UnixNano() / 1000 / 1000
if endT-startT > 0 {
averSpeed := (cc.monitor.transferSize / (endT - startT)) * 1000
fmt.Printf("\naverage speed %d(byte/s)\n", averSpeed)
LogInfo("average speed %d(byte/s)\n", averSpeed)
}
cc.cpOption.reporter.Clear()
ckFiles, _ := ioutil.ReadDir(cc.cpOption.cpDir)
if err == nil && len(ckFiles) == 0 {
LogInfo("begin Remove checkpointDir %s\n", cc.cpOption.cpDir)
os.RemoveAll(cc.cpOption.cpDir)
}
return err
}