in tunnel/tcp_reader.go [28:76]
func (reader *TCPReader) Link(replayer []Replayer) (err error) {
reader.replayer = replayer
for i := 0; i != TotalQueueNum; i++ {
reader.channel[i] = new(ListenSocket)
reader.channel[i].addr, err = net.ResolveTCPAddr("tcp4", reader.listenAddress)
if err != nil {
LOG.Critical("Resolve channel listenAddress error: %s", err.Error())
return err
}
}
reader.channel[RecvAckChannel].addr.Port = reader.channel[RecvAckChannel].addr.Port + 1
for i := 0; i != TotalQueueNum; i++ {
reader.channel[i].listener, err = net.ListenTCP("tcp", reader.channel[i].addr)
if err != nil {
LOG.Critical("Tcp reader server listen %v error: %s", reader.channel[i].addr, err.Error())
return err
}
}
// fork listen acceptor for oplog transfer tunnel
nimo.GoRoutineInLoop(func() {
socket, err := reader.channel[TransferChannel].listener.AcceptTCP()
if err != nil {
LOG.Warn("Server accept channel error : %s", err.Error())
return
}
socket.SetNoDelay(false)
socket.SetLinger(0)
socket.SetReadBuffer(1024 * 1024 * 16)
nimo.GoRoutine(func() {
reader.recvTransfer(socket)
})
})
// fork listen acceptor for ack value query tunnel
nimo.GoRoutineInLoop(func() {
socket, err := reader.channel[RecvAckChannel].listener.AcceptTCP()
if err != nil {
LOG.Warn("Server ACK accept ch error : %s", err.Error())
return
}
socket.SetNoDelay(true)
socket.SetLinger(0)
nimo.GoRoutine(func() {
reader.recvGetAck(socket)
})
})
return nil
}