in collector/syncer.go [441:503]
func (sync *OplogSyncer) deserializer(index int) {
// parser is used to parse the raw []byte
var parser func(input []byte) (*oplog.PartialLog, error)
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
// parse []byte (change stream event format) -> oplog
parser = func(input []byte) (*oplog.PartialLog, error) {
return oplog.ConvertEvent2Oplog(input, conf.Options.IncrSyncChangeStreamWatchFullDocument)
}
} else {
// parse []byte (oplog format) -> oplog
parser = func(input []byte) (*oplog.PartialLog, error) {
log := oplog.ParsedLog{}
err := bson.Unmarshal(input, &log)
return &oplog.PartialLog{
ParsedLog: log,
}, err
}
}
// combiner is used to combine data and send to downstream
var combiner func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog
// change stream && !direct && !(kafka & json)
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream &&
conf.Options.Tunnel != utils.VarTunnelDirect &&
!(conf.Options.Tunnel == utils.VarTunnelKafka &&
conf.Options.TunnelMessage == utils.VarTunnelMessageJson) {
// very time consuming!
combiner = func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog {
if out, err := bson.Marshal(log.ParsedLog); err != nil {
LOG.Crashf("%s deserializer marshal[%v] failed: %v", sync, log.ParsedLog, err)
return nil
} else {
return &oplog.GenericOplog{Raw: out, Parsed: log}
}
}
} else {
combiner = func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog {
return &oplog.GenericOplog{Raw: raw, Parsed: log}
}
}
// run
for {
batchRawLogs := <-sync.PendingQueue[index]
nimo.AssertTrue(len(batchRawLogs) != 0, "pending queue batch logs has zero length")
var deserializeLogs = make([]*oplog.GenericOplog, 0, len(batchRawLogs))
for _, rawLog := range batchRawLogs {
log, err := parser(rawLog)
if err != nil {
LOG.Crashf("%s deserializer parse data failed[%v]", sync, err)
}
log.RawSize = len(rawLog)
deserializeLogs = append(deserializeLogs, combiner(rawLog, log))
}
// set the fetch timestamp
if len(deserializeLogs) > 0 {
sync.LastFetchTs = deserializeLogs[0].Parsed.Timestamp
}
sync.logsQueue[index] <- deserializeLogs
}
}