func()

in collector/syncer.go [441:503]


func (sync *OplogSyncer) deserializer(index int) {
	// parser is used to parse the raw []byte
	var parser func(input []byte) (*oplog.PartialLog, error)
	if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
		// parse []byte (change stream event format) -> oplog
		parser = func(input []byte) (*oplog.PartialLog, error) {
			return oplog.ConvertEvent2Oplog(input, conf.Options.IncrSyncChangeStreamWatchFullDocument)
		}
	} else {
		// parse []byte (oplog format) -> oplog
		parser = func(input []byte) (*oplog.PartialLog, error) {
			log := oplog.ParsedLog{}
			err := bson.Unmarshal(input, &log)
			return &oplog.PartialLog{
				ParsedLog: log,
			}, err
		}
	}

	// combiner is used to combine data and send to downstream
	var combiner func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog
	// change stream && !direct && !(kafka & json)
	if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream &&
		conf.Options.Tunnel != utils.VarTunnelDirect &&
		!(conf.Options.Tunnel == utils.VarTunnelKafka &&
			conf.Options.TunnelMessage == utils.VarTunnelMessageJson) {
		// very time consuming!
		combiner = func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog {
			if out, err := bson.Marshal(log.ParsedLog); err != nil {
				LOG.Crashf("%s deserializer marshal[%v] failed: %v", sync, log.ParsedLog, err)
				return nil
			} else {
				return &oplog.GenericOplog{Raw: out, Parsed: log}
			}
		}
	} else {
		combiner = func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog {
			return &oplog.GenericOplog{Raw: raw, Parsed: log}
		}
	}

	// run
	for {
		batchRawLogs := <-sync.PendingQueue[index]
		nimo.AssertTrue(len(batchRawLogs) != 0, "pending queue batch logs has zero length")
		var deserializeLogs = make([]*oplog.GenericOplog, 0, len(batchRawLogs))

		for _, rawLog := range batchRawLogs {
			log, err := parser(rawLog)
			if err != nil {
				LOG.Crashf("%s deserializer parse data failed[%v]", sync, err)
			}
			log.RawSize = len(rawLog)
			deserializeLogs = append(deserializeLogs, combiner(rawLog, log))
		}

		// set the fetch timestamp
		if len(deserializeLogs) > 0 {
			sync.LastFetchTs = deserializeLogs[0].Parsed.Timestamp
		}
		sync.logsQueue[index] <- deserializeLogs
	}
}