in tunnel/kafka_writer.go [193:242]
func (tunnel *KafkaWriter) writeKafka() {
// debug
var debugF *os.File
var err error
if conf.Options.IncrSyncTunnelKafkaDebug != "" {
fileName := fmt.Sprintf("%s-%d", conf.Options.IncrSyncTunnelKafkaDebug, tunnel.PartitionId)
if _, err := os.Stat(fileName); os.IsNotExist(err) {
if debugF, err = os.Create(fileName); err != nil {
LOG.Crashf("%s create kafka debug file[%v] failed: %v", tunnel, fileName, err)
}
} else {
if debugF, err = os.OpenFile(fileName, os.O_RDWR, 0666); err != nil {
LOG.Crashf("%s open kafka debug file[%v] failed: %v", tunnel, fileName, err)
}
}
defer debugF.Close()
}
for {
tunnel.popIdx = (tunnel.popIdx + 1) % tunnel.encoderNr
// read chan
for data := range tunnel.outputChan[tunnel.popIdx] {
if unitTestWriteKafkaFlag {
// unit test only
unitTestWriteKafkaChan <- data.log
} else if conf.Options.IncrSyncTunnelKafkaDebug != "" {
if _, err = debugF.Write(data.log); err != nil {
LOG.Crashf("%s write to kafka debug file failed: %v, input data: %s", tunnel, err, data.log)
}
debugF.Write([]byte{10})
} else {
for {
if err = tunnel.writer.SimpleWrite(data.log); err != nil {
LOG.Error("%s send [%v] with type[%v] error[%v]", tunnel, tunnel.RemoteAddr,
conf.Options.TunnelMessage, err)
tunnel.state = ReplyError
time.Sleep(time.Second)
} else {
break
}
}
}
if data.isEnd {
break
}
}
}
}