func()

in nimo-shake/incr-sync/syncer.go [528:590]


func (d *Dispatcher) executor() {
	if conf.Options.SyncMode == utils.SyncModeAll && conf.Options.IncrSyncParallel == true {
		ckptWriter := checkpoint.NewWriter(conf.Options.CheckpointType, conf.Options.CheckpointAddress,
			conf.Options.CheckpointDb)
		for {
			status, err := ckptWriter.FindStatus()

			if err != nil || status != checkpoint.CheckpointStatusValueIncrSync {
				LOG.Info("%s wait for full_sync_done[err:%v][status:%s][d.executorChan:%d]",
					d.String(), err, status, len(d.executorChan))
				time.Sleep(5 * time.Second)
			} else {
				LOG.Info("%s full_sync_done, do incr sync", d.String())
				break
			}
		}
	}

	d.waitFatherShardFinished()

	for node := range d.executorChan {
		LOG.Info("%s try write data with length[%v], tp[%v] approximate[%v] [d.executorChan:%d]",
			d.String(), len(node.index), node.tp, node.approximateCreationDateTime, len(d.executorChan))
		var err error
		switch node.tp {
		case EventInsert:
			err = d.targetWriter.Insert(node.operate, node.index)
		case EventMODIFY:
			err = d.targetWriter.Update(node.operate, node.index)
		case EventRemove:
			err = d.targetWriter.Delete(node.index)
		default:
			LOG.Crashf("unknown write operation[%v]", node.tp)
		}

		if err != nil {
			LOG.Crashf("execute command[%v] failed[%v]", node.tp, err)
		}

		d.metric.AddSuccess(uint64(len(node.index)))
		d.metric.AddCheckpoint(1)
		d.checkpointPosition = node.lastSequenceNumber
		d.checkpointApproximateTime = node.approximateCreationDateTime
	}

	// update checkpoint: finish
	err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, map[string]interface{}{
		checkpoint.FieldStatus: checkpoint.StatusDone,
	}, d.ns.Collection)
	if err != nil {
		LOG.Crashf("%s update checkpoint to done failed[%v]", d.String(), err)
	}

	LOG.Info("%s executor exit", d.String())

	d.close = true
	go func() {
		<-time.NewTimer(time.Minute * 5).C
		d.targetWriter.Close()
		LOG.Info("%s incr sync writer close", d.String())
	}()
	LOG.Info("%s close in 5 minutes", d.String())
}