func()

in executor/operation.go [54:146]


func (exec *Executor) execute(group *OplogsGroup) error {
	count := uint64(len(group.oplogRecords))
	if count == 0 {
		// probe
		return nil
	}

	lastOne := group.oplogRecords[count-1]

	if !conf.Options.IncrSyncExecutorDebug {
		if !exec.ensureConnection() {
			return fmt.Errorf("Replay-%d network connection lost . we would retry for next connecting",
				exec.batchExecutor.ReplayerId)
		}
		// just use the first log. they have the same metadata
		// zhongli: ???
		metadata := buildMetadata(group.oplogRecords[0].original.partialLog)
		hasIndex := strings.Contains(group.ns, "system.indexes")
		// LOG.Debug("fullFinishTs: %v", utils.ExtractTimestampForLog(exec.batchExecutor.FullFinishTs))
		dbWriter := NewDbWriter(exec.conn, metadata, exec.bulkInsert && !hasIndex, exec.batchExecutor.FullFinishTs)
		var err error

		LOG.Debug("Replay-%d oplog collection ns [%s] with command [%s] batch count %d, metadata %v",
			exec.batchExecutor.ReplayerId, group.ns, strings.ToUpper(lookupOpName(group.op)), count, metadata)

		/*
		 * in the former version, we filter DDL here. But in current version, all messages that need filter
		 * have already removed in the collector(syncer). So here, we only need to write all oplogs.
		 */
		// for indexes
		// "0" -> database, "1" -> collection
		dc := strings.SplitN(group.ns, ".", 2)
		switch group.op {
		case "i":
			err = dbWriter.doInsert(dc[0], dc[1], metadata, group.oplogRecords,
				conf.Options.IncrSyncExecutorInsertOnDupUpdate)
			atomic.AddUint64(&exec.metricInsert, uint64(len(group.oplogRecords)))
			exec.addNsMapMetric(group.ns, "i", len(group.oplogRecords))
		case "u":
			err = dbWriter.doUpdate(dc[0], dc[1], metadata, group.oplogRecords,
				conf.Options.IncrSyncExecutorUpsert)
			atomic.AddUint64(&exec.metricUpdate, uint64(len(group.oplogRecords)))
			exec.addNsMapMetric(group.ns, "u", len(group.oplogRecords))
		case "d":
			err = dbWriter.doDelete(dc[0], dc[1], metadata, group.oplogRecords)
			atomic.AddUint64(&exec.metricDelete, uint64(len(group.oplogRecords)))
			exec.addNsMapMetric(group.ns, "d", len(group.oplogRecords))
		case "c":
			LOG.Info("Replay-%d run DDL with metadata[%v] in db[%v], firstLog: %v", exec.batchExecutor.ReplayerId,
				dc[0], metadata, group.oplogRecords[0].original.partialLog)
			err = dbWriter.doCommand(dc[0], metadata, group.oplogRecords)
			atomic.AddUint64(&exec.metricDDL, uint64(len(group.oplogRecords)))
			exec.addNsMapMetric(group.ns, "c", len(group.oplogRecords))
		case "n":
			// exec.batchExecutor.ReplMetric.AddFilter(count)
			atomic.AddUint64(&exec.metricNoop, uint64(len(group.oplogRecords)))
			exec.addNsMapMetric(group.ns, "n", len(group.oplogRecords))
		default:
			atomic.AddUint64(&exec.metricUnknown, uint64(len(group.oplogRecords)))
			LOG.Warn("Replay-%d meets unknown type oplogs found. op '%s'", exec.batchExecutor.ReplayerId, group.op)
			exec.addNsMapMetric(group.ns, "x", len(group.oplogRecords))
		}

		// a few known error we can skip !! such as "ShardKeyNotFound" returned
		// if mongoshake connected to MongoS
		if exec.errorIgnore(err) {
			LOG.Info("Replay-%d Discard known error[%v], It's acceptable", exec.batchExecutor.ReplayerId, err)
			err = nil
		}

		if err != nil {
			LOG.Critical("Replayer-%d, executor-%d, oplog for namespace[%s] op[%s] failed. error type[%v]"+
				" error[%v], logs number[%d], firstLog: %s",
				exec.batchExecutor.ReplayerId, exec.id, group.ns, group.op, reflect.TypeOf(err), err.Error(), count,
				group.oplogRecords[0].original.partialLog)
			exec.dropConnection()
			atomic.AddUint64(&exec.metricError, uint64(len(group.oplogRecords)))
			exec.addNsMapMetric(group.ns, "e", len(group.oplogRecords))

			return err
		}
	}
	// exec.batchExecutor.ReplMetric.ReplStatus.Clear(utils.ReplicaExecBad)
	// wait for conflict break point if need
	if lastOne.wait != nil {
		lastOne.wait()
	}

	// group logs have the equivalent namespace
	//ns := group.logs[0].original.partialLog.Namespace
	//exec.replayer.ReplMetric.AddTableOps(ns, count)
	return nil
}