in tunnel/kafka_writer.go [116:191]
func (tunnel *KafkaWriter) encode(id int) {
for message := range tunnel.inputChan[id] {
message.Tag |= MsgPersistent
switch conf.Options.TunnelMessage {
case utils.VarTunnelMessageBson:
// write the raw oplog directly
for i, log := range message.RawLogs {
tunnel.outputChan[id] <- outputLog{
isEnd: i == len(log)-1,
log: log,
}
}
case utils.VarTunnelMessageJson:
for i, log := range message.ParsedLogs {
// json marshal
var encode []byte
var err error
if conf.Options.TunnelJsonFormat == "" {
encode, err = json.Marshal(log.ParsedLog)
if err != nil {
if strings.Contains(err.Error(), "unsupported value:") {
LOG.Error("%s json marshal data[%v] meets unsupported value[%v], skip current oplog",
tunnel, log.ParsedLog, err)
continue
} else {
// should panic
LOG.Crashf("%s json marshal data[%v] error[%v]", tunnel, log.ParsedLog, err)
tunnel.state = ReplyServerFault
}
}
} else if conf.Options.TunnelJsonFormat == "canonical_extended_json" {
encode, err = bson.MarshalExtJSON(log.ParsedLog, true, true)
if err != nil {
// should panic
LOG.Crashf("%s json marshal data[%v] error[%v]", tunnel, log.ParsedLog, err)
tunnel.state = ReplyServerFault
}
} else {
LOG.Crashf("unknown tunnel.json.format[%v]", conf.Options.TunnelJsonFormat)
}
tunnel.outputChan[id] <- outputLog{
isEnd: i == len(message.ParsedLogs)-1,
log: encode,
}
}
case utils.VarTunnelMessageRaw:
byteBuffer := bytes.NewBuffer([]byte{})
// checksum
binary.Write(byteBuffer, binary.BigEndian, uint32(message.Checksum))
// tag
binary.Write(byteBuffer, binary.BigEndian, uint32(message.Tag))
// shard
binary.Write(byteBuffer, binary.BigEndian, uint32(message.Shard))
// compressor
binary.Write(byteBuffer, binary.BigEndian, uint32(message.Compress))
// serialize log count
binary.Write(byteBuffer, binary.BigEndian, uint32(len(message.RawLogs)))
// serialize logs
for i, log := range message.RawLogs {
binary.Write(byteBuffer, binary.BigEndian, uint32(len(log)))
binary.Write(byteBuffer, binary.BigEndian, log)
tunnel.outputChan[id] <- outputLog{
isEnd: i == len(message.ParsedLogs)-1,
log: byteBuffer.Bytes(),
}
}
default:
LOG.Crash("%s unknown tunnel.message type: ", tunnel, conf.Options.TunnelMessage)
}
}
}