func()

in tunnel/kafka_writer.go [116:191]


func (tunnel *KafkaWriter) encode(id int) {
	for message := range tunnel.inputChan[id] {
		message.Tag |= MsgPersistent

		switch conf.Options.TunnelMessage {
		case utils.VarTunnelMessageBson:
			// write the raw oplog directly
			for i, log := range message.RawLogs {
				tunnel.outputChan[id] <- outputLog{
					isEnd: i == len(log)-1,
					log:   log,
				}
			}
		case utils.VarTunnelMessageJson:
			for i, log := range message.ParsedLogs {
				// json marshal
				var encode []byte
				var err error
				if conf.Options.TunnelJsonFormat == "" {
					encode, err = json.Marshal(log.ParsedLog)
					if err != nil {
						if strings.Contains(err.Error(), "unsupported value:") {
							LOG.Error("%s json marshal data[%v] meets unsupported value[%v], skip current oplog",
								tunnel, log.ParsedLog, err)
							continue
						} else {
							// should panic
							LOG.Crashf("%s json marshal data[%v] error[%v]", tunnel, log.ParsedLog, err)
							tunnel.state = ReplyServerFault
						}
					}
				} else if conf.Options.TunnelJsonFormat == "canonical_extended_json" {
					encode, err = bson.MarshalExtJSON(log.ParsedLog, true, true)
					if err != nil {
						// should panic
						LOG.Crashf("%s json marshal data[%v] error[%v]", tunnel, log.ParsedLog, err)
						tunnel.state = ReplyServerFault
					}
				} else {
					LOG.Crashf("unknown tunnel.json.format[%v]", conf.Options.TunnelJsonFormat)
				}

				tunnel.outputChan[id] <- outputLog{
					isEnd: i == len(message.ParsedLogs)-1,
					log:   encode,
				}
			}
		case utils.VarTunnelMessageRaw:
			byteBuffer := bytes.NewBuffer([]byte{})
			// checksum
			binary.Write(byteBuffer, binary.BigEndian, uint32(message.Checksum))
			// tag
			binary.Write(byteBuffer, binary.BigEndian, uint32(message.Tag))
			// shard
			binary.Write(byteBuffer, binary.BigEndian, uint32(message.Shard))
			// compressor
			binary.Write(byteBuffer, binary.BigEndian, uint32(message.Compress))
			// serialize log count
			binary.Write(byteBuffer, binary.BigEndian, uint32(len(message.RawLogs)))

			// serialize logs
			for i, log := range message.RawLogs {
				binary.Write(byteBuffer, binary.BigEndian, uint32(len(log)))
				binary.Write(byteBuffer, binary.BigEndian, log)

				tunnel.outputChan[id] <- outputLog{
					isEnd: i == len(message.ParsedLogs)-1,
					log:   byteBuffer.Bytes(),
				}
			}

		default:
			LOG.Crash("%s unknown tunnel.message type: ", tunnel, conf.Options.TunnelMessage)
		}
	}
}