nimo-shake/checkpoint/struct.go (96 lines of code) (raw):

package checkpoint import ( "encoding/json" "fmt" "nimo-shake/filter" "sync" ) const ( CheckpointWriterTypeMongo = "mongodb" CheckpointWriterTypeFile = "file" CheckpointStatusTable = "status_table" CheckpointStatusKey = "status_key" CheckpointStatusValueEmpty = "" CheckpointStatusValueFullSync = "full_sync" CheckpointStatusValueIncrSync = "incr_sync" // 0: not process; 1: no need to process; 2: prepare stage 3: in processing; 4: wait father finish, 5: done StatusNotProcess = "not process" StatusNoNeedProcess = "no need to process" StatusPrepareProcess = "prepare stage" StatusInProcessing = "in processing" StatusWaitFather = "wait father finish" StatusDone = "done" IteratorTypeLatest = "LATEST" IteratorTypeAtSequence = "AT_SEQUENCE_NUMBER" IteratorTypeAfterSequence = "AFTER_SEQUENCE_NUMBER" IteratorTypeTrimHorizon = "TRIM_HORIZON" InitShardIt = "see sequence number" StreamViewType = "NEW_AND_OLD_IMAGES" FieldShardId = "ShardId" FieldShardIt = "ShardIt" FieldStatus = "Status" FieldSeqNum = "SequenceNumber" FieldIteratorType = "IteratorType" FieldTimestamp = "UpdateDate" FieldApproximateTime = "ApproximateTime" ) type Checkpoint struct { ShardId string `bson:"ShardId" json:"ShardId"` // shard id FatherId string `bson:"FatherId" json:"FatherId"` // father id SequenceNumber string `bson:"SequenceNumber" json:"SequenceNumber"` // checkpoint Status string `bson:"Status" json:"Status"` // status WorkerId string `bson:"WorkerId" json:"WorkerId"` // thread number IteratorType string `bson:"IteratorType" json:"IteratorType"` // "LATEST" or "AT_SEQUENCE_NUMBER" ShardIt string `bson:"ShardIt" json:"ShardIt"` // only used when IteratorType == "LATEST" UpdateDate string `bson:"UpdateDate" json:"UpdateDate"` // update checkpoint time ApproximateTime string `bson:"ApproximateTime" json:"ApproximateTime"` // approximate time of records } func (c *Checkpoint) String() string { out, _ := json.Marshal(c) return fmt.Sprintf("%s", out) } type Status struct { Key string `bson:"Key" json:"Key"` // key -> CheckpointStatusKey Value string `bson:"StatusValue" json:"StatusValue"` // CheckpointStatusValueFullSync or CheckpointStatusValueIncrSync } /*---------------------------------------*/ var ( GlobalShardIteratorMap = ShardIteratorMap{ mp: make(map[string]string), } ) type ShardIteratorMap struct { mp map[string]string lock sync.Mutex } func (sim *ShardIteratorMap) Set(key, iterator string) bool { sim.lock.Lock() defer sim.lock.Unlock() if _, ok := sim.mp[key]; ok { return false } sim.mp[key] = iterator return false } func (sim *ShardIteratorMap) Get(key string) (string, bool) { sim.lock.Lock() defer sim.lock.Unlock() it, ok := sim.mp[key] return it, ok } func (sim *ShardIteratorMap) Delete(key string) bool { sim.lock.Lock() defer sim.lock.Unlock() if _, ok := sim.mp[key]; ok { delete(sim.mp, key) return true } return false } /*---------------------------------------*/ func FilterCkptCollection(collection string) bool { return collection == CheckpointStatusTable || filter.IsFilter(collection) } func IsStatusProcessing(status string) bool { return status == StatusPrepareProcess || status == StatusInProcessing || status == StatusWaitFather } func IsStatusNoNeedProcess(status string) bool { return status == StatusDone || status == StatusNoNeedProcess }