func()

in tunnel/kafka_writer.go [193:242]


func (tunnel *KafkaWriter) writeKafka() {
	// debug
	var debugF *os.File
	var err error
	if conf.Options.IncrSyncTunnelKafkaDebug != "" {
		fileName := fmt.Sprintf("%s-%d", conf.Options.IncrSyncTunnelKafkaDebug, tunnel.PartitionId)
		if _, err := os.Stat(fileName); os.IsNotExist(err) {
			if debugF, err = os.Create(fileName); err != nil {
				LOG.Crashf("%s create kafka debug file[%v] failed: %v", tunnel, fileName, err)
			}
		} else {
			if debugF, err = os.OpenFile(fileName, os.O_RDWR, 0666); err != nil {
				LOG.Crashf("%s open kafka debug file[%v] failed: %v", tunnel, fileName, err)
			}
		}
		defer debugF.Close()
	}

	for {
		tunnel.popIdx = (tunnel.popIdx + 1) % tunnel.encoderNr
		// read chan
		for data := range tunnel.outputChan[tunnel.popIdx] {
			if unitTestWriteKafkaFlag {
				// unit test only
				unitTestWriteKafkaChan <- data.log
			} else if conf.Options.IncrSyncTunnelKafkaDebug != "" {
				if _, err = debugF.Write(data.log); err != nil {
					LOG.Crashf("%s write to kafka debug file failed: %v, input data: %s", tunnel, err, data.log)
				}
				debugF.Write([]byte{10})
			} else {
				for {
					if err = tunnel.writer.SimpleWrite(data.log); err != nil {
						LOG.Error("%s send [%v] with type[%v] error[%v]", tunnel, tunnel.RemoteAddr,
							conf.Options.TunnelMessage, err)

						tunnel.state = ReplyError
						time.Sleep(time.Second)
					} else {
						break
					}
				}
			}

			if data.isEnd {
				break
			}
		}
	}
}