tunnel/kafka_writer.go (195 lines of code) (raw):

package tunnel import ( "bytes" "encoding/binary" "encoding/json" "fmt" "math" "sync/atomic" "time" "github.com/alibaba/MongoShake/v2/collector/configure" "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/tunnel/kafka" LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" "os" "strings" ) const ( inputChanSize = 256 outputChanSize = 4196 ) var ( unitTestWriteKafkaFlag = false unitTestWriteKafkaChan chan []byte ) type outputLog struct { isEnd bool log []byte } type KafkaWriter struct { RemoteAddr string PartitionId int // write to which partition writer *kafka.SyncWriter // writer state int64 // state: ok, error encoderNr int64 // how many encoder inputChan []chan *WMessage // each encoder has 1 inputChan outputChan []chan outputLog // output chan length pushIdx int64 // push into which encoder popIdx int64 // pop from which encoder } func (tunnel *KafkaWriter) Name() string { return "kafka" } func (tunnel *KafkaWriter) Prepare() bool { var writer *kafka.SyncWriter var err error if !unitTestWriteKafkaFlag && conf.Options.IncrSyncTunnelKafkaDebug == "" { writer, err = kafka.NewSyncWriter(conf.Options.TunnelMongoSslRootCaFile, tunnel.RemoteAddr, tunnel.PartitionId) if err != nil { LOG.Critical("KafkaWriter prepare[%v] create writer error[%v]", tunnel.RemoteAddr, err) return false } if err := writer.Start(); err != nil { LOG.Critical("KafkaWriter prepare[%v] start writer error[%v]", tunnel.RemoteAddr, err) return false } } tunnel.writer = writer tunnel.state = ReplyOK tunnel.encoderNr = int64(math.Max(float64(conf.Options.IncrSyncTunnelWriteThread/conf.Options.IncrSyncWorker), 1)) tunnel.inputChan = make([]chan *WMessage, tunnel.encoderNr) tunnel.outputChan = make([]chan outputLog, tunnel.encoderNr) tunnel.pushIdx = 0 tunnel.popIdx = 0 LOG.Info("%s starts: writer_thread count[%v]", tunnel, tunnel.encoderNr) // start encoder for i := 0; i < int(tunnel.encoderNr); i++ { tunnel.inputChan[i] = make(chan *WMessage, inputChanSize) tunnel.outputChan[i] = make(chan outputLog, outputChanSize) go tunnel.encode(i) } // start kafkaWriter go tunnel.writeKafka() return true } func (tunnel *KafkaWriter) Send(message *WMessage) int64 { if len(message.RawLogs) == 0 || message.Tag&MsgProbe != 0 { return 0 } encoderId := atomic.AddInt64(&tunnel.pushIdx, 1) tunnel.inputChan[encoderId%tunnel.encoderNr] <- message // for transfer() not into default branch and then endless loop return 0 } // KafkaWriter.AckRequired() is always false, return 0 directly func (tunnel *KafkaWriter) AckRequired() bool { return false } func (tunnel *KafkaWriter) ParsedLogsRequired() bool { return false } func (tunnel *KafkaWriter) String() string { return fmt.Sprintf("KafkaWriter[%v] with partitionId[%v]", tunnel.RemoteAddr, tunnel.PartitionId) } func (tunnel *KafkaWriter) encode(id int) { for message := range tunnel.inputChan[id] { message.Tag |= MsgPersistent switch conf.Options.TunnelMessage { case utils.VarTunnelMessageBson: // write the raw oplog directly for i, log := range message.RawLogs { tunnel.outputChan[id] <- outputLog{ isEnd: i == len(log)-1, log: log, } } case utils.VarTunnelMessageJson: for i, log := range message.ParsedLogs { // json marshal var encode []byte var err error if conf.Options.TunnelJsonFormat == "" { encode, err = json.Marshal(log.ParsedLog) if err != nil { if strings.Contains(err.Error(), "unsupported value:") { LOG.Error("%s json marshal data[%v] meets unsupported value[%v], skip current oplog", tunnel, log.ParsedLog, err) continue } else { // should panic LOG.Crashf("%s json marshal data[%v] error[%v]", tunnel, log.ParsedLog, err) tunnel.state = ReplyServerFault } } } else if conf.Options.TunnelJsonFormat == "canonical_extended_json" { encode, err = bson.MarshalExtJSON(log.ParsedLog, true, true) if err != nil { // should panic LOG.Crashf("%s json marshal data[%v] error[%v]", tunnel, log.ParsedLog, err) tunnel.state = ReplyServerFault } } else { LOG.Crashf("unknown tunnel.json.format[%v]", conf.Options.TunnelJsonFormat) } tunnel.outputChan[id] <- outputLog{ isEnd: i == len(message.ParsedLogs)-1, log: encode, } } case utils.VarTunnelMessageRaw: byteBuffer := bytes.NewBuffer([]byte{}) // checksum binary.Write(byteBuffer, binary.BigEndian, uint32(message.Checksum)) // tag binary.Write(byteBuffer, binary.BigEndian, uint32(message.Tag)) // shard binary.Write(byteBuffer, binary.BigEndian, uint32(message.Shard)) // compressor binary.Write(byteBuffer, binary.BigEndian, uint32(message.Compress)) // serialize log count binary.Write(byteBuffer, binary.BigEndian, uint32(len(message.RawLogs))) // serialize logs for i, log := range message.RawLogs { binary.Write(byteBuffer, binary.BigEndian, uint32(len(log))) binary.Write(byteBuffer, binary.BigEndian, log) tunnel.outputChan[id] <- outputLog{ isEnd: i == len(message.ParsedLogs)-1, log: byteBuffer.Bytes(), } } default: LOG.Crash("%s unknown tunnel.message type: ", tunnel, conf.Options.TunnelMessage) } } } func (tunnel *KafkaWriter) writeKafka() { // debug var debugF *os.File var err error if conf.Options.IncrSyncTunnelKafkaDebug != "" { fileName := fmt.Sprintf("%s-%d", conf.Options.IncrSyncTunnelKafkaDebug, tunnel.PartitionId) if _, err := os.Stat(fileName); os.IsNotExist(err) { if debugF, err = os.Create(fileName); err != nil { LOG.Crashf("%s create kafka debug file[%v] failed: %v", tunnel, fileName, err) } } else { if debugF, err = os.OpenFile(fileName, os.O_RDWR, 0666); err != nil { LOG.Crashf("%s open kafka debug file[%v] failed: %v", tunnel, fileName, err) } } defer debugF.Close() } for { tunnel.popIdx = (tunnel.popIdx + 1) % tunnel.encoderNr // read chan for data := range tunnel.outputChan[tunnel.popIdx] { if unitTestWriteKafkaFlag { // unit test only unitTestWriteKafkaChan <- data.log } else if conf.Options.IncrSyncTunnelKafkaDebug != "" { if _, err = debugF.Write(data.log); err != nil { LOG.Crashf("%s write to kafka debug file failed: %v, input data: %s", tunnel, err, data.log) } debugF.Write([]byte{10}) } else { for { if err = tunnel.writer.SimpleWrite(data.log); err != nil { LOG.Error("%s send [%v] with type[%v] error[%v]", tunnel, tunnel.RemoteAddr, conf.Options.TunnelMessage, err) tunnel.state = ReplyError time.Sleep(time.Second) } else { break } } } if data.isEnd { break } } } }