oss/checkpoint.go (273 lines of code) (raw):

package oss import ( "crypto/md5" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" "reflect" "strconv" "strings" ) // ----- download checkpoint ----- type downloadCheckpoint struct { CpDirPath string // checkpoint dir full path CpFilePath string // checkpoint file full path VerifyData bool // verify downloaded data in FilePath Loaded bool // If Info.Data.DownloadInfo is loaded from checkpoint Info struct { //checkpoint data Magic string // Magic MD5 string // The Data's MD5 Data struct { // source ObjectInfo struct { Name string // oss://bucket/key VersionId string Range string } ObjectMeta struct { Size int64 LastModified string ETag string } // destination FilePath string // Local file // download info PartSize int64 DownloadInfo struct { Offset int64 CRC64 uint64 } } } } func newDownloadCheckpoint(request *GetObjectRequest, filePath string, baseDir string, header http.Header, partSize int64) *downloadCheckpoint { var buf strings.Builder name := fmt.Sprintf("%v/%v", ToString(request.Bucket), ToString(request.Key)) buf.WriteString("oss://" + escapePath(name, false)) buf.WriteString("\n") buf.WriteString(ToString(request.VersionId)) buf.WriteString("\n") buf.WriteString(ToString(request.Range)) hashmd5 := md5.New() hashmd5.Write([]byte(buf.String())) srcHash := hex.EncodeToString(hashmd5.Sum(nil)) absPath, _ := filepath.Abs(filePath) hashmd5.Reset() hashmd5.Write([]byte(absPath)) destHash := hex.EncodeToString(hashmd5.Sum(nil)) var dir string if baseDir == "" { dir = os.TempDir() } else { dir = filepath.Dir(baseDir) } cpFilePath := filepath.Join(dir, fmt.Sprintf("%v-%v%v", srcHash, destHash, CheckpointFileSuffixDownloader)) cp := &downloadCheckpoint{ CpFilePath: cpFilePath, CpDirPath: dir, } objectSize, _ := strconv.ParseInt(header.Get("Content-Length"), 10, 64) cp.Info.Magic = CheckpointMagic cp.Info.Data.ObjectInfo.Name = "oss://" + name cp.Info.Data.ObjectInfo.VersionId = ToString(request.VersionId) cp.Info.Data.ObjectInfo.Range = ToString(request.Range) cp.Info.Data.ObjectMeta.Size = objectSize cp.Info.Data.ObjectMeta.LastModified = header.Get("Last-Modified") cp.Info.Data.ObjectMeta.ETag = header.Get("ETag") cp.Info.Data.FilePath = filePath cp.Info.Data.PartSize = partSize return cp } // load checkpoint from local file func (cp *downloadCheckpoint) load() error { if !DirExists(cp.CpDirPath) { return fmt.Errorf("Invaid checkpoint dir, %v", cp.CpDirPath) } if !FileExists(cp.CpFilePath) { return nil } if !cp.valid() { cp.remove() return nil } cp.Loaded = true return nil } func (cp *downloadCheckpoint) valid() bool { // Compare the CP's Magic and the MD5 contents, err := os.ReadFile(cp.CpFilePath) if err != nil { return false } dcp := downloadCheckpoint{} if err = json.Unmarshal(contents, &dcp.Info); err != nil { return false } js, _ := json.Marshal(dcp.Info.Data) sum := md5.Sum(js) md5sum := hex.EncodeToString(sum[:]) if CheckpointMagic != dcp.Info.Magic || md5sum != dcp.Info.MD5 { return false } // compare if !reflect.DeepEqual(cp.Info.Data.ObjectInfo, dcp.Info.Data.ObjectInfo) || !reflect.DeepEqual(cp.Info.Data.ObjectMeta, dcp.Info.Data.ObjectMeta) || cp.Info.Data.FilePath != dcp.Info.Data.FilePath || cp.Info.Data.PartSize != dcp.Info.Data.PartSize { return false } // download info if dcp.Info.Data.DownloadInfo.Offset < 0 { return false } if dcp.Info.Data.DownloadInfo.Offset == 0 && dcp.Info.Data.DownloadInfo.CRC64 != 0 { return false } rOffset := int64(0) if len(cp.Info.Data.ObjectInfo.Range) > 0 { if r, err := ParseRange(cp.Info.Data.ObjectInfo.Range); err != nil { return false } else { rOffset = r.Offset } } if dcp.Info.Data.DownloadInfo.Offset < rOffset { return false } remains := (dcp.Info.Data.DownloadInfo.Offset - rOffset) % dcp.Info.Data.PartSize if remains != 0 { return false } //valid data if cp.VerifyData && dcp.Info.Data.DownloadInfo.CRC64 != 0 { if file, err := os.Open(cp.Info.Data.FilePath); err == nil { hash := NewCRC64(0) limitN := dcp.Info.Data.DownloadInfo.Offset - rOffset io.Copy(hash, io.LimitReader(file, limitN)) file.Close() if hash.Sum64() != dcp.Info.Data.DownloadInfo.CRC64 { return false } } } // update cp.Info.Data.DownloadInfo = dcp.Info.Data.DownloadInfo return true } // dump dumps to file func (cp *downloadCheckpoint) dump() error { // Calculate MD5 js, _ := json.Marshal(cp.Info.Data) sum := md5.Sum(js) md5sum := hex.EncodeToString(sum[:]) cp.Info.MD5 = md5sum // Serialize js, err := json.Marshal(cp.Info) if err != nil { return err } // Dump return os.WriteFile(cp.CpFilePath, js, FilePermMode) } func (cp *downloadCheckpoint) remove() error { return os.Remove(cp.CpFilePath) } // ----- upload chcekpoint ----- type uploadCheckpoint struct { CpDirPath string // checkpoint dir full path CpFilePath string // checkpoint file full path Loaded bool // If Info.Data.UploadInfo is loaded from checkpoint Info struct { //checkpoint data Magic string // Magic MD5 string // The Data's MD5 Data struct { // source FilePath string // Local file FileMeta struct { Size int64 LastModified string } // destination ObjectInfo struct { Name string // oss://bucket/key } // upload info PartSize int64 UploadInfo struct { UploadId string } } } } func newUploadCheckpoint(request *PutObjectRequest, filePath string, baseDir string, fileInfo os.FileInfo, partSize int64) *uploadCheckpoint { name := fmt.Sprintf("%v/%v", ToString(request.Bucket), ToString(request.Key)) hashmd5 := md5.New() hashmd5.Write([]byte("oss://" + escapePath(name, false))) destHash := hex.EncodeToString(hashmd5.Sum(nil)) absPath, _ := filepath.Abs(filePath) hashmd5.Reset() hashmd5.Write([]byte(absPath)) srcHash := hex.EncodeToString(hashmd5.Sum(nil)) var dir string if baseDir == "" { dir = os.TempDir() } else { dir = filepath.Dir(baseDir) } cpFilePath := filepath.Join(dir, fmt.Sprintf("%v-%v%v", srcHash, destHash, CheckpointFileSuffixUploader)) cp := &uploadCheckpoint{ CpFilePath: cpFilePath, CpDirPath: dir, } cp.Info.Magic = CheckpointMagic cp.Info.Data.FilePath = filePath cp.Info.Data.FileMeta.Size = fileInfo.Size() cp.Info.Data.FileMeta.LastModified = fileInfo.ModTime().String() cp.Info.Data.ObjectInfo.Name = "oss://" + name cp.Info.Data.PartSize = partSize return cp } // load checkpoint from local file func (cp *uploadCheckpoint) load() error { if !DirExists(cp.CpDirPath) { return fmt.Errorf("Invaid checkpoint dir, %v", cp.CpDirPath) } if !FileExists(cp.CpFilePath) { return nil } if !cp.valid() { cp.remove() return nil } cp.Loaded = true return nil } func (cp *uploadCheckpoint) valid() bool { // Compare the CP's Magic and the MD5 contents, err := os.ReadFile(cp.CpFilePath) if err != nil { return false } dcp := uploadCheckpoint{} if err = json.Unmarshal(contents, &dcp.Info); err != nil { return false } js, _ := json.Marshal(dcp.Info.Data) sum := md5.Sum(js) md5sum := hex.EncodeToString(sum[:]) if CheckpointMagic != dcp.Info.Magic || md5sum != dcp.Info.MD5 { return false } // compare if !reflect.DeepEqual(cp.Info.Data.ObjectInfo, dcp.Info.Data.ObjectInfo) || !reflect.DeepEqual(cp.Info.Data.FileMeta, dcp.Info.Data.FileMeta) || cp.Info.Data.FilePath != dcp.Info.Data.FilePath || cp.Info.Data.PartSize != dcp.Info.Data.PartSize { return false } // download info if len(dcp.Info.Data.UploadInfo.UploadId) == 0 { return false } // update cp.Info.Data.UploadInfo = dcp.Info.Data.UploadInfo return true } // dump dumps to file func (cp *uploadCheckpoint) dump() error { // Calculate MD5 js, _ := json.Marshal(cp.Info.Data) sum := md5.Sum(js) md5sum := hex.EncodeToString(sum[:]) cp.Info.MD5 = md5sum // Serialize js, err := json.Marshal(cp.Info) if err != nil { return err } // Dump return os.WriteFile(cp.CpFilePath, js, FilePermMode) } func (cp *uploadCheckpoint) remove() error { return os.Remove(cp.CpFilePath) }