in nimo-shake/incr-sync/syncer.go [528:590]
func (d *Dispatcher) executor() {
if conf.Options.SyncMode == utils.SyncModeAll && conf.Options.IncrSyncParallel == true {
ckptWriter := checkpoint.NewWriter(conf.Options.CheckpointType, conf.Options.CheckpointAddress,
conf.Options.CheckpointDb)
for {
status, err := ckptWriter.FindStatus()
if err != nil || status != checkpoint.CheckpointStatusValueIncrSync {
LOG.Info("%s wait for full_sync_done[err:%v][status:%s][d.executorChan:%d]",
d.String(), err, status, len(d.executorChan))
time.Sleep(5 * time.Second)
} else {
LOG.Info("%s full_sync_done, do incr sync", d.String())
break
}
}
}
d.waitFatherShardFinished()
for node := range d.executorChan {
LOG.Info("%s try write data with length[%v], tp[%v] approximate[%v] [d.executorChan:%d]",
d.String(), len(node.index), node.tp, node.approximateCreationDateTime, len(d.executorChan))
var err error
switch node.tp {
case EventInsert:
err = d.targetWriter.Insert(node.operate, node.index)
case EventMODIFY:
err = d.targetWriter.Update(node.operate, node.index)
case EventRemove:
err = d.targetWriter.Delete(node.index)
default:
LOG.Crashf("unknown write operation[%v]", node.tp)
}
if err != nil {
LOG.Crashf("execute command[%v] failed[%v]", node.tp, err)
}
d.metric.AddSuccess(uint64(len(node.index)))
d.metric.AddCheckpoint(1)
d.checkpointPosition = node.lastSequenceNumber
d.checkpointApproximateTime = node.approximateCreationDateTime
}
// update checkpoint: finish
err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, map[string]interface{}{
checkpoint.FieldStatus: checkpoint.StatusDone,
}, d.ns.Collection)
if err != nil {
LOG.Crashf("%s update checkpoint to done failed[%v]", d.String(), err)
}
LOG.Info("%s executor exit", d.String())
d.close = true
go func() {
<-time.NewTimer(time.Minute * 5).C
d.targetWriter.Close()
LOG.Info("%s incr sync writer close", d.String())
}()
LOG.Info("%s close in 5 minutes", d.String())
}