oss/lib/sync.go (795 lines of code) (raw):
package lib
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
oss "github.com/aliyun/aliyun-oss-go-sdk/oss"
)
/*
* Put same type variables together to make them 64bits alignment to avoid
* atomic.AddInt64() panic
* Please guarantee the alignment if you add new filed
*/
var MaxSyncNumbers int = 1000000
type syncOptionType struct {
bDelete bool
encodingType string
backupDir string
force bool
enableSymlinkDir bool
onlyCurrentDir bool
disableDirObject bool
disableAllSymlink bool
cpDir string
removeCount int
filters []filterOptionType
payerOptions []oss.Option
}
var specChineseSync = SpecText{
synopsisText: "将本地文件目录或者oss prefix从源端同步到目的端",
paramText: "src dest [options]",
syntaxText: `
ossutil sync local_dir cloud_url [-f] [-u] [--delete] [--backup-dir] [--enable-symlink-dir] [--disable-all-symlink] [--disable-ignore-error] [--only-current-dir] [--output-dir=odir] [--bigfile-threshold=size] [--checkpoint-dir=cdir] [--snapshot-path=sdir] [--payer requester]
ossutil sync cloud_url local_dir [-f] [-u] [--delete] [--backup-dir] [--only-current-dir] [--disable-ignore-error] [--output-dir=odir] [--bigfile-threshold=size] [--checkpoint-dir=cdir] [--range=x-y] [--payer requester]
ossutil sync cloud_url cloud_url [-f] [-u] [--delete] [--backup-dir] [--only-current-dir] [--disable-ignore-error] [--output-dir=odir] [--bigfile-threshold=size] [--checkpoint-dir=cdir] [--payer requester]
`,
detailHelpText: `
该命令和cp命令类似:支持从本地文件系统上传文件到oss,从oss下载object到本地文件系统,在oss
上进行object拷贝; 用于源端和目的端数据同步.
sync命令和cp命令不同之处如下:
1、当输入--delete选项时,该命令会自动删除目的端存在而源端不存在的object或者移走本地文件
如果目的端是oss, 则会删除多余的object; 如果目的端是本地文件, 则会移走本地多余的文件
警告: 在没有完全搞清楚sync命令的行为之前, 请慎用--delete
2、sync强制是以recursive方式遍历文件或者object, 所以不用输入-r --recursive
3、当源端是oss://bucket/prefix, sync命令会自动在prefix后面加上字符 '/', 但是cp命令不会
当目的端是oss://bucket/prefix, sync命令会在prefix后面加上字符 '/'; cp命令在--recursive被输入时也会在prefix后面加上字符'/'
--delete选项
表示需要删除或者移走目的端存在而源端不存在的object或者文件
--backup-dir
该选项表示用于备份目的端文件的目录, 不能是目的端目录的子目录,如果输入了--delete, 该选项必须输入
其他选项说明、用法和cp命令相同
`,
sampleText: `
1) 从本地文件上传到oss
前置条件如下:
[root]# ls -al ./test_sync
total 16
drwxr-xr-x 2 root root 4096 Oct 5 13:09 .
drwxr-xr-x 5 root root 4096 Oct 5 13:10 ..
-rw-r--r-- 1 root root 38 Oct 5 12:48 a.txt
-rw-r--r-- 1 root root 118 Oct 5 12:48 b.txt
[root]# ./ossutil ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 13:10:23 +0800 CST 156 Standard D282A160AB960C11ED797F009858D41D oss://wangtw-test-sync2/a.txt
2020-10-05 13:09:06 +0800 CST 21029 Standard 975F9F8EC2B34B15936FDAE1C85E71C9 oss://wangtw-test-sync2/c.txt
Object Number is: 2
# run ossutil sync command
[root]# ./ossutil64 sync ./test_sync oss://wangtw-test-sync2/ --delete -f
total file(directory) count:2
oss://wangtw-test-sync2,total oss object count:2
object will be deleted count:1
Succeed: Total num: 2, size: 156. OK num: 2(upload 2 files).
average speed 6000(byte/s)
delete object count:1
0.111655(s) elapsed
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 13:13:21 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/a.txt
2020-10-05 13:13:21 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/b.txt
Object Number is: 2
执行sync命令之后, 本地文件a.txt, b.txt被上传到oss, oss object key c.txt 被删除
2) 从oss上下载
前置条件如下:
[root]# ls -al ./test_sync
total 36
drwxr-xr-x 2 root root 4096 Oct 5 12:42 .
drwxr-xr-x 4 root root 4096 Oct 5 12:42 ..
-rw-r--r-- 1 root root 38 Oct 5 11:22 a.txt
-rw-r--r-- 1 root root 21029 Oct 5 11:22 d.txt
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 11:55:47 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/a.txt
2020-10-05 11:55:47 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/b.txt
2020-10-05 11:55:47 +0800 CST 21029 Standard 975F9F8EC2B34B15936FDAE1C85E71C9 oss://wangtw-test-sync2/c.txt
Object Number is: 3
# run ossutil sync command
[root]# ./ossutil64 sync oss://wangtw-test-sync2 ./test_sync --delete -f --backup-dir backup
oss://wangtw-test-sync2,total oss object count:3
total file(directory) count:2
file(directory) will be removed count:1
Succeed: Total num: 3, size: 21,185. OK num: 3(download 3 objects).
average speed 243000(byte/s)
remove file(dir) count:1
0.172867(s) elapsed
[root]# ls -al ./test_sync/
total 40
drwxr-xr-x 2 root root 4096 Oct 5 12:48 .
drwxr-xr-x 4 root root 4096 Oct 5 12:48 ..
-rw-r--r-- 1 root root 38 Oct 5 12:48 a.txt
-rw-r--r-- 1 root root 118 Oct 5 12:48 b.txt
-rw-r--r-- 1 root root 21029 Oct 5 12:48 c.txt
[root]# ls -al ./backup/
total 32
drwxr-xr-x 2 root root 4096 Oct 5 12:48 .
drwxr-xr-x 4 root root 4096 Oct 5 12:48 ..
-rw-r--r-- 1 root root 21029 Oct 5 11:22 d.txt
执行sync命令之后, oss object keys a.txt, b.txt, c.txt被下载下来
本地文件d.txt被移动到备份目录backup
3) 在oss之间copy
前置条件如下:
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 12:55:47 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/prefix1/a.txt
2020-10-05 12:55:47 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/prefix1/b.txt
2020-10-05 13:00:58 +0800 CST 65 Standard 77F476710889A7FAA2603093B08EB31B oss://wangtw-test-sync2/prefix2/a.txt
2020-10-05 12:56:23 +0800 CST 21029 Standard 975F9F8EC2B34B15936FDAE1C85E71C9 oss://wangtw-test-sync2/prefix2/c.txt
Object Number is: 4
0.067659(s) elapsed
[root]# ./ossutil64 sync oss://wangtw-test-sync2/prefix1 oss://wangtw-test-sync2/prefix2 --delete -f
oss://wangtw-test-sync2/prefix1/,total oss object count:2
oss://wangtw-test-sync2/prefix2/,total oss object count:2
object will be deleted count:1
Succeed: Total num: 2, size: 156. OK num: 2(copy 2 objects).
average speed 5000(byte/s)
delete object count:1
0.071010(s) elapsed
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 12:55:47 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/prefix1/a.txt
2020-10-05 12:55:47 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/prefix1/b.txt
2020-10-05 13:02:02 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/prefix2/a.txt
2020-10-05 13:02:02 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/prefix2/b.txt
Object Number is: 4
执行sync命令之后, oss object key prefix1/a.txt, prefix1/b.txt 被拷贝到
prefix2/a.txt, prefix2/b.txt 并且 prefix2/c.txt 被删除
`,
}
var specEnglishSync = SpecText{
synopsisText: "Sync the local file directory or oss prefix from the source to the destination",
paramText: "src dest [options]",
syntaxText: `
ossutil sync local_dir cloud_url [-f] [-u] [--delete] [--backup-dir] [--enable-symlink-dir] [--disable-all-symlink] [--disable-ignore-error] [--only-current-dir] [--output-dir=odir] [--bigfile-threshold=size] [--checkpoint-dir=cdir] [--snapshot-path=sdir] [--payer requester]
ossutil sync cloud_url local_dir [-f] [-u] [--delete] [--backup-dir] [--only-current-dir] [--disable-ignore-error] [--output-dir=odir] [--bigfile-threshold=size] [--checkpoint-dir=cdir] [--range=x-y] [--payer requester]
ossutil sync cloud_url cloud_url [-f] [-u] [--delete] [--backup-dir] [--only-current-dir] [--disable-ignore-error] [--output-dir=odir] [--bigfile-threshold=size] [--checkpoint-dir=cdir] [--payer requester]
`,
detailHelpText: `
This command is similar to the cp command: it supports uploading files from the local file
system to oss, and downloading objects from oss to the local file system, Copying objects between
oss; It's used for data synchronization between source and destination.
The differences between the sync command and the cp command are as follows:
1、When the --delete option is entered, the command will automatically delete objects or remove files
that exist on the destination but not exist on the source.
If the destination is oss, the redundant objects will be deleted;
If the destination is local directory, the redundant files will be removed to back up directory.
Warning: Before you fully understand the behavior of the sync command, please use --delete carefully
2、Sync is forced to traverse files or objects recursively, so there is no need to enter -r or --recursive
3、When the src is oss://bucket/prefix, sync command will add character '/' after the prefix, but cp command doesn't add
When the destination is oss://bucket/prefix, sync command will add character '/' after the prefix; The cp command will also add '/' after the prefix with --recusive is entered
--delete
Indicates that the objects or files which exist on destination
and not exist on src needs to be deleted or removed to backup dir
--backup-dir
This option indicates the directory used to back up the destination files needed to be removed,
It cannot be a subdirectory of the destination directory.
If you enter --delete, this option must be entered
Other options descriptions and usage are the same as the cp command
`,
sampleText: `
1) Upload to oss
The preconditions are as follows:
[root]# ls -al ./test_sync
total 16
drwxr-xr-x 2 root root 4096 Oct 5 13:09 .
drwxr-xr-x 5 root root 4096 Oct 5 13:10 ..
-rw-r--r-- 1 root root 38 Oct 5 12:48 a.txt
-rw-r--r-- 1 root root 118 Oct 5 12:48 b.txt
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 13:10:23 +0800 CST 156 Standard D282A160AB960C11ED797F009858D41D oss://wangtw-test-sync2/a.txt
2020-10-05 13:09:06 +0800 CST 21029 Standard 975F9F8EC2B34B15936FDAE1C85E71C9 oss://wangtw-test-sync2/c.txt
Object Number is: 2
# run ossutil sync command
[root]# ./ossutil64 sync ./test_sync oss://wangtw-test-sync2/ --delete -f
total file(directory) count:2
oss://wangtw-test-sync2,total oss object count:2
object will be deleted count:1
Succeed: Total num: 2, size: 156. OK num: 2(upload 2 files).
average speed 6000(byte/s)
delete object count:1
0.111655(s) elapsed
[root]# ./ossutil64 ls oss://wangtw-test-sync2/
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 13:13:21 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/a.txt
2020-10-05 13:13:21 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/b.txt
Object Number is: 2
After the sync command is executed, local files a.txt, b.txt are uploaded and oss object
key c.txt is deleted
2) download from oss
The preconditions are as follows:
[root]# ls -al ./test_sync
total 36
drwxr-xr-x 2 root root 4096 Oct 5 12:42 .
drwxr-xr-x 4 root root 4096 Oct 5 12:42 ..
-rw-r--r-- 1 root root 38 Oct 5 11:22 a.txt
-rw-r--r-- 1 root root 21029 Oct 5 11:22 d.txt
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 11:55:47 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/a.txt
2020-10-05 11:55:47 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/b.txt
2020-10-05 11:55:47 +0800 CST 21029 Standard 975F9F8EC2B34B15936FDAE1C85E71C9 oss://wangtw-test-sync2/c.txt
Object Number is: 3
# run ossutil sync command
[root]# ./ossutil64 sync oss://wangtw-test-sync2 ./test_sync --delete -f --backup-dir backup
oss://wangtw-test-sync2,total oss object count:3
total file(directory) count:2
file(directory) will be removed count:1
Succeed: Total num: 3, size: 21,185. OK num: 3(download 3 objects).
average speed 243000(byte/s)
remove file(dir) count:1
0.172867(s) elapsed
[root]# ls -al ./test_sync/
total 40
drwxr-xr-x 2 root root 4096 Oct 5 12:48 .
drwxr-xr-x 4 root root 4096 Oct 5 12:48 ..
-rw-r--r-- 1 root root 38 Oct 5 12:48 a.txt
-rw-r--r-- 1 root root 118 Oct 5 12:48 b.txt
-rw-r--r-- 1 root root 21029 Oct 5 12:48 c.txt
[root]# ls -al ./backup/
total 32
drwxr-xr-x 2 root root 4096 Oct 5 12:48 .
drwxr-xr-x 4 root root 4096 Oct 5 12:48 ..
-rw-r--r-- 1 root root 21029 Oct 5 11:22 d.txt
After the sync command is executed, oss keys a.txt, b.txt, c.txt are downloaded
and local file d.txt is removed to backup dir
3) copy between oss
The preconditions are as follows:
[root]# ./ossutil64 ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 12:55:47 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/prefix1/a.txt
2020-10-05 12:55:47 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/prefix1/b.txt
2020-10-05 13:00:58 +0800 CST 65 Standard 77F476710889A7FAA2603093B08EB31B oss://wangtw-test-sync2/prefix2/a.txt
2020-10-05 12:56:23 +0800 CST 21029 Standard 975F9F8EC2B34B15936FDAE1C85E71C9 oss://wangtw-test-sync2/prefix2/c.txt
Object Number is: 4
0.067659(s) elapsed
[root]# ./ossutil64 sync oss://wangtw-test-sync2/prefix1 oss://wangtw-test-sync2/prefix2 --delete -f
oss://wangtw-test-sync2/prefix1/,total oss object count:2
oss://wangtw-test-sync2/prefix2/,total oss object count:2
object will be deleted count:1
Succeed: Total num: 2, size: 156. OK num: 2(copy 2 objects).
average speed 5000(byte/s)
delete object count:1
0.071010(s) elapsed
[root]# ./ossutil ls oss://wangtw-test-sync2
LastModifiedTime Size(B) StorageClass ETAG ObjectName
2020-10-05 12:55:47 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/prefix1/a.txt
2020-10-05 12:55:47 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/prefix1/b.txt
2020-10-05 13:02:02 +0800 CST 38 Standard 533F2D421ED096DBB4CF458AADA64F8E oss://wangtw-test-sync2/prefix2/a.txt
2020-10-05 13:02:02 +0800 CST 118 Standard C6639C466B70758C82596413C2BB050A oss://wangtw-test-sync2/prefix2/b.txt
Object Number is: 4
After the sync command is executed, oss keys prefix1/a.txt, prefix1/b.txt are copied to
prefix2/a.txt, prefix2/b.txt and prefix2/c.txt is deleted
`,
}
// SyncCommand is the command upload, download and copy objects
type SyncCommand struct {
command Command
syncOption syncOptionType
}
var syncCommand = SyncCommand{
command: Command{
name: "sync",
nameAlias: []string{"sync"},
minArgc: 2,
maxArgc: 2,
specChinese: specChineseSync,
specEnglish: specEnglishSync,
group: GroupTypeNormalCommand,
validOptionNames: []string{
// The following options are supported by sc command and cp command
//OptionRecursion,
OptionForce,
OptionUpdate,
OptionContinue,
OptionOutputDir,
OptionBigFileThreshold,
OptionPartSize,
OptionCheckpointDir,
OptionRange,
OptionEncodingType,
OptionInclude,
OptionExclude,
OptionMeta,
OptionACL,
OptionConfigFile,
OptionEndpoint,
OptionAccessKeyID,
OptionAccessKeySecret,
OptionSTSToken,
OptionProxyHost,
OptionProxyUser,
OptionProxyPwd,
OptionRetryTimes,
OptionRoutines,
OptionParallel,
OptionSnapshotPath,
OptionDisableCRC64,
OptionRequestPayer,
OptionLogLevel,
OptionMaxUpSpeed,
//OptionPartitionDownload,
//OptionVersionId,
OptionLocalHost,
OptionEnableSymlinkDir,
OptionOnlyCurrentDir,
OptionDisableDirObject,
OptionDisableAllSymlink,
OptionDisableIgnoreError,
OptionTagging,
OptionPassword,
OptionMode,
OptionECSRoleName,
OptionTokenTimeout,
OptionRamRoleArn,
OptionRoleSessionName,
OptionExternalId,
OptionReadTimeout,
OptionConnectTimeout,
OptionSTSRegion,
OptionSkipVerifyCert,
OptionMaxDownSpeed,
OptionUserAgent,
OptionSignVersion,
OptionRegion,
OptionCloudBoxID,
OptionForcePathStyle,
// The following options are only supported by sc command, not supported by cp command
OptionDelete,
OptionBackupDir,
},
},
}
// function for FormatHelper interface
func (sc *SyncCommand) formatHelpForWhole() string {
return sc.command.formatHelpForWhole()
}
func (sc *SyncCommand) formatIndependHelp() string {
return sc.command.formatIndependHelp()
}
// Init simulate inheritance, and polymorphism
func (sc *SyncCommand) Init(args []string, options OptionMapType) error {
recursive := true
bakupOptions := make(OptionMapType)
for k, v := range options {
bakupOptions[k] = v
}
// force to recursive and delete unsurpported options
bakupOptions[OptionRecursion] = &recursive
delete(bakupOptions, OptionDelete)
delete(bakupOptions, OptionBackupDir)
copyCommand.cpOption.bSyncCommand = true
err := (©Command).Init(args, bakupOptions)
if err != nil {
return err
}
return sc.command.Init(args, options, sc)
}
// RunCommand simulate inheritance, and polymorphism
func (sc *SyncCommand) RunCommand() error {
sc.syncOption.bDelete, _ = GetBool(OptionDelete, sc.command.options)
sc.syncOption.encodingType, _ = GetString(OptionEncodingType, sc.command.options)
sc.syncOption.backupDir, _ = GetString(OptionBackupDir, sc.command.options)
// for list file
sc.syncOption.enableSymlinkDir, _ = GetBool(OptionEnableSymlinkDir, sc.command.options)
sc.syncOption.onlyCurrentDir, _ = GetBool(OptionOnlyCurrentDir, sc.command.options)
sc.syncOption.disableDirObject, _ = GetBool(OptionDisableDirObject, sc.command.options)
sc.syncOption.disableAllSymlink, _ = GetBool(OptionDisableAllSymlink, sc.command.options)
sc.syncOption.force, _ = GetBool(OptionForce, sc.command.options)
// check point dir
sc.syncOption.cpDir, _ = GetString(OptionCheckpointDir, sc.command.options)
// payer
payer, _ := GetString(OptionRequestPayer, sc.command.options)
if payer != "" {
if payer != strings.ToLower(string(oss.Requester)) {
return fmt.Errorf("invalid request payer: %s, please check", payer)
}
sc.syncOption.payerOptions = append(sc.syncOption.payerOptions, oss.RequestPayer(oss.PayerType(payer)))
}
// filters
var res bool
res, sc.syncOption.filters = getFilter(os.Args)
if !res {
return fmt.Errorf("--include or --exclude does not support format containing dir info")
}
for k, v := range sc.syncOption.filters {
LogInfo("filter %d,name:%s,pattern:%s\n", k, v.name, v.pattern)
}
srcURL, err := StorageURLFromString(sc.command.args[0], sc.syncOption.encodingType)
if err != nil {
return err
}
destURL, err := StorageURLFromString(sc.command.args[1], sc.syncOption.encodingType)
if err != nil {
return err
}
if srcURL.IsFileURL() && destURL.IsFileURL() {
return fmt.Errorf("not support sync between local directory")
}
if srcURL.IsFileURL() {
f, err := os.Stat(srcURL.ToString())
if err != nil {
return err
}
if !f.IsDir() {
return fmt.Errorf("src %s is not directory", srcURL.ToString())
}
}
if !sc.syncOption.bDelete {
return copyCommand.RunCommand()
}
// sync command add '/' afert cloud prefix
// cp command must have the same action when run as sync command
srcURL = sc.adjustCloudUrl(srcURL)
destURL = sc.adjustCloudUrl(destURL)
// check backup dir
if destURL.IsFileURL() {
err = sc.CheckDestBackupDir(destURL)
if err != nil {
return err
}
}
opType := sc.getCommandType(srcURL, destURL)
// get file list or object key list
srcKeys := make(map[string]string)
destKeys := make(map[string]string)
if srcURL.IsFileURL() {
err = sc.GetLocalFileKeys(srcURL, srcKeys)
} else {
err = sc.GetOssKeys(srcURL, srcKeys)
}
if err != nil {
return err
}
if destURL.IsFileURL() {
err = sc.GetLocalFileKeys(destURL, destKeys)
} else {
err = sc.GetOssKeys(destURL, destKeys)
}
if err != nil {
return err
}
// Get keys to be deleted
bSame := (string(os.PathSeparator) == "/")
for k, _ := range srcKeys {
if bSame || opType == operationTypeCopy {
delete(destKeys, k)
} else if opType == operationTypePut {
delete(destKeys, strings.Replace(k, "\\", "/", -1))
} else {
delete(destKeys, strings.Replace(k, "/", "\\", -1))
}
}
if destURL.IsFileURL() {
fmt.Printf("\nfile(directory) will be removed count:%d\n", len(destKeys))
} else {
fmt.Printf("\nobject will be deleted count:%d\n", len(destKeys))
}
err = copyCommand.RunCommand()
if err != nil {
return err
}
// move dest files or rm dest objects which not exist in src
if opType == operationTypeCopy || opType == operationTypePut {
err = sc.DeleteExtraObjects(destKeys, destURL)
} else {
err = sc.RemoveExtraFiles(destKeys, destURL)
}
return err
}
func (sc *SyncCommand) adjustCloudUrl(sUrl StorageURLer) StorageURLer {
if sUrl.IsFileURL() {
return sUrl
}
cloudUrl := sUrl.(CloudURL)
if len(cloudUrl.object) > 0 && !strings.HasSuffix(cloudUrl.object, "/") {
cloudUrl.object += "/"
}
return cloudUrl
}
func (sc *SyncCommand) DeleteExtraObjects(keys map[string]string, sUrl StorageURLer) error {
bucketName := sUrl.(CloudURL).bucket
bucket, err := sc.command.ossBucket(bucketName)
if err != nil {
return err
}
deleteCount := 0
rmOptions := append(sc.syncOption.payerOptions, oss.DeleteObjectsQuiet(true))
objects := []string{}
for k, v := range keys {
if len(objects) >= MaxBatchCount {
if sc.confirm(objects) {
err := sc.BatchRmObjects(bucket, objects, rmOptions)
if err != nil {
return err
}
}
objects = []string{}
deleteCount += MaxBatchCount
fmt.Printf("\rdelete object count:%d", deleteCount)
}
// prefix + relativeKey
objects = append(objects, v+k)
}
if len(objects) > 0 && sc.confirm(objects) {
err := sc.BatchRmObjects(bucket, objects, rmOptions)
if err != nil {
return err
}
deleteCount += len(objects)
fmt.Printf("\rdelete object count:%d", deleteCount)
}
return nil
}
func (sc *SyncCommand) RemoveExtraFiles(keys map[string]string, sUrl StorageURLer) error {
var sortList []string
for k, _ := range keys {
sortList = append(sortList, k)
}
// remove files first,then remove dir
sort.Sort(sort.Reverse(sort.StringSlice(sortList)))
absDirName, err := sc.GetAbsPath(sUrl.ToString())
if err != nil {
return err
}
nowFatherDirName := ""
for _, k := range sortList {
if strings.HasSuffix(k, string(os.PathSeparator)) {
// is dir
dirName := k[0 : len(k)-1]
readerInfos, _ := sc.readDirLimit(absDirName+dirName, 10)
if len(readerInfos) > 0 {
continue
} else {
//empty dir,need to remove or delete
f, err := os.Stat(sc.syncOption.backupDir + dirName)
if err != nil {
sc.movePath(absDirName+dirName, sc.syncOption.backupDir+dirName)
} else {
if !f.IsDir() {
return fmt.Errorf("backup %s is already exist,but is file", sc.syncOption.backupDir+dirName)
} else {
// delete the dir
os.RemoveAll(absDirName + dirName)
}
}
}
} else {
// is file
fatherDir := absDirName
index := strings.LastIndex(k, string(os.PathSeparator))
if index >= 0 {
fatherDir = k[:index]
}
if fatherDir != nowFatherDirName && fatherDir != absDirName {
os.MkdirAll(sc.syncOption.backupDir+fatherDir, 0755)
nowFatherDirName = fatherDir
}
err := sc.movePath(absDirName+k, sc.syncOption.backupDir+k)
if err != nil {
return err
}
}
}
return nil
}
func (sc *SyncCommand) BatchRmObjects(bucket *oss.Bucket, objects []string, options []oss.Option) error {
delRes, err := bucket.DeleteObjects(objects, options...)
if err != nil {
return err
}
if len(delRes.DeletedObjects) > 0 {
errMsg := ""
for _, objectKey := range delRes.DeletedObjects {
errMsg += (" " + objectKey)
}
LogError("delete erro %s\n", errMsg)
return fmt.Errorf("delete %s error", errMsg)
}
for _, v := range objects {
LogInfo("delete object success %s\n", v)
}
return nil
}
func (sc *SyncCommand) getCommandType(srcURL StorageURLer, destURL StorageURLer) operationType {
if srcURL.IsCloudURL() {
if destURL.IsFileURL() {
return operationTypeGet
}
return operationTypeCopy
}
return operationTypePut
}
func (sc *SyncCommand) GetLocalFileKeys(sUrl StorageURLer, keys map[string]string) error {
strPath := sUrl.ToString()
if !strings.HasSuffix(strPath, string(os.PathSeparator)) {
// for symlink dir
strPath += string(os.PathSeparator)
}
chFiles := make(chan fileInfoType, ChannelBuf)
chFinish := make(chan error, 2)
go sc.ReadLocalFileKeys(chFiles, chFinish, keys)
go sc.GetFileList(strPath, chFiles, chFinish)
select {
case err := <-chFinish:
if err != nil {
return err
}
}
return nil
}
func (sc *SyncCommand) GetFileList(strPath string, chFiles chan<- fileInfoType, chFinish chan<- error) {
err := getFileListCommon(strPath, chFiles, sc.syncOption.onlyCurrentDir,
sc.syncOption.disableAllSymlink, sc.syncOption.enableSymlinkDir, sc.syncOption.filters)
if err != nil {
chFinish <- err
}
}
func (sc *SyncCommand) ReadLocalFileKeys(chFiles <-chan fileInfoType, chFinish chan<- error, keys map[string]string) {
totalCount := 0
fmt.Printf("\n")
for fileInfo := range chFiles {
if copyCommand.filterFile(fileInfo, sc.syncOption.cpDir) { // exclude checkpoint files
totalCount++
fmt.Printf("\rtotal file(directory) count:%d", totalCount)
keys[fileInfo.filePath] = ""
if len(keys) > MaxSyncNumbers {
fmt.Printf("\n")
chFinish <- fmt.Errorf("over max sync numbers %d", MaxSyncNumbers)
break
}
}
}
fmt.Printf("\rtotal file(directory) count:%d", totalCount)
chFinish <- nil
}
func (sc *SyncCommand) GetAbsPath(strPath string) (string, error) {
if filepath.IsAbs(strPath) {
return strPath, nil
}
currentDir, err := os.Getwd()
if err != nil {
return "", err
}
if !strings.HasSuffix(strPath, string(os.PathSeparator)) {
strPath += string(os.PathSeparator)
}
strPath = currentDir + string(os.PathSeparator) + strPath
absPath, err := filepath.Abs(strPath)
if err != nil {
return "", err
}
if !strings.HasSuffix(absPath, string(os.PathSeparator)) {
absPath += string(os.PathSeparator)
}
return absPath, err
}
func (sc *SyncCommand) CheckDestBackupDir(sUrl StorageURLer) error {
// create bacup dir
createDirectory := false
f, err := os.Stat(sUrl.ToString())
if err != nil {
if err := os.MkdirAll(sUrl.ToString(), 0755); err != nil {
return err
}
createDirectory = true
} else if !f.IsDir() {
return fmt.Errorf("dest dir %s is file,is not directory", sUrl.ToString())
}
if createDirectory && sc.syncOption.backupDir == "" {
return nil
}
if sc.syncOption.backupDir == "" {
return fmt.Errorf("dest backup dir is empty string,please use --backup-dir")
}
if !strings.HasSuffix(sc.syncOption.backupDir, string(os.PathSeparator)) {
sc.syncOption.backupDir += string(os.PathSeparator)
}
// check backup dir is Subdirectories or not
absfilePath, errF := sc.GetAbsPath(sUrl.ToString())
if errF != nil {
return errF
}
absBackPath, errB := sc.GetAbsPath(sc.syncOption.backupDir)
if errB != nil {
return errB
}
if strings.Index(absBackPath, absfilePath) >= 0 {
return fmt.Errorf("backup dir %s is subdirectory of %s", sc.syncOption.backupDir, sUrl.ToString())
}
// create bacup dir
f, err = os.Stat(sc.syncOption.backupDir)
if err != nil {
if err := os.MkdirAll(sc.syncOption.backupDir, 0755); err != nil {
return err
}
} else if !f.IsDir() {
return fmt.Errorf("dest backup dir %s is file,is not directory", sc.syncOption.backupDir)
}
return nil
}
func (sc *SyncCommand) GetOssKeys(sUrl StorageURLer, keys map[string]string) error {
bucketName := sUrl.(CloudURL).bucket
bucket, err := sc.command.ossBucket(bucketName)
if err != nil {
return err
}
chFiles := make(chan objectInfoType, ChannelBuf)
chFinish := make(chan error, 2)
go sc.ReadOssKeys(keys, sUrl, chFiles, chFinish)
go sc.GetOssKeyList(bucket, sUrl, chFiles, chFinish)
select {
case err := <-chFinish:
if err != nil {
return err
}
}
return nil
}
func (sc *SyncCommand) GetOssKeyList(bucket *oss.Bucket, sURL StorageURLer, chObjects chan<- objectInfoType, chFinish chan<- error) {
cloudURL := sURL.(CloudURL)
err := getObjectListCommon(bucket, cloudURL, chObjects, sc.syncOption.onlyCurrentDir,
sc.syncOption.filters, sc.syncOption.payerOptions)
if err != nil {
chFinish <- err
}
}
func (sc *SyncCommand) ReadOssKeys(keys map[string]string, sURL StorageURLer, chObjects <-chan objectInfoType, chFinish chan<- error) {
totalCount := 0
fmt.Printf("\n")
for objectInfo := range chObjects {
totalCount++
fmt.Printf("\r%s,total oss object count:%d", sURL.ToString(), totalCount)
keys[objectInfo.relativeKey] = objectInfo.prefix
if len(keys) > MaxSyncNumbers {
fmt.Printf("\n")
chFinish <- fmt.Errorf("over max sync numbers %d", MaxSyncNumbers)
break
}
}
fmt.Printf("\r%s,total oss object count:%d", sURL.ToString(), totalCount)
chFinish <- nil
}
func (sc *SyncCommand) confirm(keys []string) bool {
if sc.syncOption.force {
return true
}
var logBuffer bytes.Buffer
logBuffer.WriteString("\n")
for _, v := range keys {
logBuffer.WriteString(fmt.Sprintf("%s\n", v))
}
logBuffer.WriteString(fmt.Sprintf("sync:delete above objects(y or N)? "))
fmt.Printf(logBuffer.String())
var val string
if _, err := fmt.Scanln(&val); err != nil || (strings.ToLower(val) != "yes" && strings.ToLower(val) != "y") {
return false
}
return true
}
func (sc *SyncCommand) readDirLimit(dirName string, limitCount int) ([]os.FileInfo, error) {
f, err := os.Open(dirName)
if err != nil {
return nil, err
}
list, err := f.Readdir(limitCount)
f.Close()
if err != nil {
return nil, err
}
return list, nil
}
func (sc *SyncCommand) movePath(srcName, destName string) error {
err := sc.moveFileToPath(srcName, destName)
if err != nil {
LogError("rename %s %s error,%s\n", srcName, destName, err.Error())
} else {
sc.syncOption.removeCount += 1
fmt.Printf("\rremove file(directory) count:%d", sc.syncOption.removeCount)
LogInfo("rename success %s %s\n", srcName, destName)
}
return err
}
func (sc *SyncCommand) moveFileToPath(srcName, destName string) error {
err := os.Rename(srcName, destName)
if err == nil {
return nil
} else {
inputFile, err := os.Open(srcName)
defer inputFile.Close()
if err != nil {
return err
}
outputFile, err := os.Create(destName)
defer outputFile.Close()
if err != nil {
return err
}
_, err = io.Copy(outputFile, inputFile)
if err != nil {
return err
}
err = os.Remove(srcName)
if err != nil {
return err
}
return nil
}
}