nimo-shake/checkpoint/fileWriter.go (306 lines of code) (raw):

package checkpoint import ( "os" "sync" "bytes" "encoding/json" "fmt" LOG "github.com/vinllen/log4go" "io/ioutil" "path/filepath" "reflect" "strings" ) // marshal in json type FileWriter struct { dir string fileHandler *sync.Map // file name -> fd fileLock *sync.Map // file name -> lock } func NewFileWriter(dir string) *FileWriter { // create dir if not exist if _, err := os.Stat(dir); err != nil { if os.IsNotExist(err) { // create dir if err = os.Mkdir(dir, 0755); err != nil { LOG.Crashf("create dir[%v] failed[%v]", dir, err) return nil } } else { LOG.Crashf("stat dir[%v] failed[%v]", dir, err) return nil } } return &FileWriter{ dir: dir, fileHandler: new(sync.Map), fileLock: new(sync.Map), } } // find current status func (fw *FileWriter) FindStatus() (string, error) { // lock file fw.lockFile(CheckpointStatusTable) defer fw.unlockFile(CheckpointStatusTable) file := fmt.Sprintf("%s/%s", fw.dir, CheckpointStatusTable) if _, err := os.Stat(file); err != nil { if os.IsNotExist(err) { return CheckpointStatusValueEmpty, nil } } jsonFile, err := os.Open(file) if err != nil { return "", err } defer jsonFile.Close() byteValue, err := ioutil.ReadAll(jsonFile) if err != nil { return "", err } var ret Status if err := json.Unmarshal(byteValue, &ret); err != nil { return "", err } return ret.Value, nil } // update status func (fw *FileWriter) UpdateStatus(status string) error { // lock file fw.lockFile(CheckpointStatusTable) defer fw.unlockFile(CheckpointStatusTable) file := fmt.Sprintf("%s/%s", fw.dir, CheckpointStatusTable) input := &Status{ Key: CheckpointStatusKey, Value: status, } val, err := json.Marshal(input) if err != nil { return nil } f, err := os.Create(file) if err != nil { return err } defer f.Close() _, err = f.Write(val) return err } // extract all checkpoint func (fw *FileWriter) ExtractCheckpoint() (map[string]map[string]*Checkpoint, error) { ckptMap := make(map[string]map[string]*Checkpoint) // fileList isn't include directory var fileList []string err := filepath.Walk(fw.dir, func(path string, info os.FileInfo, err error) error { if path != fw.dir { pathList := strings.Split(path, "/") fileList = append(fileList, pathList[len(pathList)-1]) } return nil }) if err != nil { return nil, err } for _, file := range fileList { if FilterCkptCollection(file) { continue } innerMap, err := fw.ExtractSingleCheckpoint(file) if err != nil { return nil, err } ckptMap[file] = innerMap } return ckptMap, nil } // extract single checkpoint func (fw *FileWriter) ExtractSingleCheckpoint(table string) (map[string]*Checkpoint, error) { fw.lockFile(table) defer fw.unlockFile(table) file := fmt.Sprintf("%s/%s", fw.dir, table) jsonFile, err := os.Open(file) if err != nil { return nil, err } defer jsonFile.Close() data, err := fw.readJsonList(jsonFile) if err != nil { return nil, err } innerMap := make(map[string]*Checkpoint) for _, ele := range data { innerMap[ele.ShardId] = ele } return innerMap, nil } // insert checkpoint func (fw *FileWriter) Insert(ckpt *Checkpoint, table string) error { fw.lockFile(table) defer fw.unlockFile(table) file := fmt.Sprintf("%s/%s", fw.dir, table) jsonFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { return err } defer jsonFile.Close() LOG.Debug("file[%s] insert data: %v", file, *ckpt) return fw.writeJsonList(jsonFile, []*Checkpoint{ckpt}) } // update checkpoint func (fw *FileWriter) Update(shardId string, ckpt *Checkpoint, table string) error { fw.lockFile(table) defer fw.unlockFile(table) file := fmt.Sprintf("%s/%s", fw.dir, table) jsonFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return err } defer jsonFile.Close() data, err := fw.readJsonList(jsonFile) if err != nil { return err } if len(data) == 0 { return fmt.Errorf("empty data") } match := false for i := range data { if data[i].ShardId == shardId { match = true data[i] = ckpt break } } if !match { return fmt.Errorf("shardId[%v] not exists", shardId) } // truncate file jsonFile.Truncate(0) jsonFile.Seek(0, 0) // write return fw.writeJsonList(jsonFile, data) } // update with set func (fw *FileWriter) UpdateWithSet(shardId string, input map[string]interface{}, table string) error { fw.lockFile(table) defer fw.unlockFile(table) file := fmt.Sprintf("%s/%s", fw.dir, table) jsonFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return err } defer jsonFile.Close() data, err := fw.readJsonList(jsonFile) if err != nil { return err } if len(data) == 0 { return fmt.Errorf("empty data") } match := false for i := range data { if data[i].ShardId == shardId { match = true // set partial for key, val := range input { field := reflect.ValueOf(data[i]).Elem().FieldByName(key) switch field.Kind() { case reflect.String: v, _ := val.(string) field.SetString(v) case reflect.Invalid: printData, _ := json.Marshal(data[i]) return fmt.Errorf("invalid field[%v], current checkpoint[%s], input checkpoint[%v]", key, printData, input) default: printData, _ := json.Marshal(data[i]) return fmt.Errorf("unknown type[%v] of field[%v], current checkpoint[%s], input checkpoint[%v]", field.Kind(), key, printData, input) } } break } } if !match { return fmt.Errorf("shardId[%v] not exists", shardId) } // truncate file jsonFile.Truncate(0) jsonFile.Seek(0, 0) // write return fw.writeJsonList(jsonFile, data) } // query func (fw *FileWriter) Query(shardId string, table string) (*Checkpoint, error) { fw.lockFile(table) defer fw.unlockFile(table) file := fmt.Sprintf("%s/%s", fw.dir, table) jsonFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return nil, err } defer jsonFile.Close() data, err := fw.readJsonList(jsonFile) if err != nil { return nil, err } for _, ele := range data { if ele.ShardId == shardId { return ele, nil } } return nil, fmt.Errorf("not found") } // drop func (fw *FileWriter) DropAll() error { var fileList []string err := filepath.Walk(fw.dir, func(path string, info os.FileInfo, err error) error { if path != fw.dir { fileList = append(fileList, path) } return nil }) if err != nil { return err } LOG.Info("drop file list: %v", fileList) for _, file := range fileList { fw.lockFile(file) if err := os.Remove(file); err != nil { fw.unlockFile(file) return err } fw.unlockFile(file) } return nil } func (fw *FileWriter) lockFile(table string) { val, ok := fw.fileLock.Load(CheckpointStatusTable) if !ok { val = new(sync.Mutex) fw.fileLock.Store(CheckpointStatusTable, val) } lock := val.(*sync.Mutex) lock.Lock() } func (fw *FileWriter) unlockFile(table string) { val, ok := fw.fileLock.Load(CheckpointStatusTable) if !ok { val = new(sync.Mutex) fw.fileLock.Store(CheckpointStatusTable, val) } lock := val.(*sync.Mutex) lock.Unlock() } func (fw *FileWriter) readJsonList(f *os.File) ([]*Checkpoint, error) { byteValue, err := ioutil.ReadAll(f) if err != nil { return nil, err } byteList := bytes.Split(byteValue, []byte{10}) ret := make([]*Checkpoint, 0, len(byteList)) for i := 0; i < len(byteList)-1; i++ { var ele Checkpoint if err := json.Unmarshal(byteList[i], &ele); err != nil { return nil, err } ret = append(ret, &ele) } return ret, nil } func (fw *FileWriter) writeJsonList(f *os.File, input []*Checkpoint) error { for _, single := range input { val, err := json.Marshal(single) if err != nil { return nil } val = append(val, byte(10)) // suffix if _, err := f.Write(val); err != nil { return err } } return nil } func (fw *FileWriter) IncrCacheFileInsert(table string, shardId string, fileName string, lastSequenceNumber string, time string) error { return nil }