func()

in tunnel/tcp_reader.go [28:76]


func (reader *TCPReader) Link(replayer []Replayer) (err error) {
	reader.replayer = replayer
	for i := 0; i != TotalQueueNum; i++ {
		reader.channel[i] = new(ListenSocket)
		reader.channel[i].addr, err = net.ResolveTCPAddr("tcp4", reader.listenAddress)
		if err != nil {
			LOG.Critical("Resolve channel listenAddress error: %s", err.Error())
			return err
		}
	}
	reader.channel[RecvAckChannel].addr.Port = reader.channel[RecvAckChannel].addr.Port + 1
	for i := 0; i != TotalQueueNum; i++ {
		reader.channel[i].listener, err = net.ListenTCP("tcp", reader.channel[i].addr)
		if err != nil {
			LOG.Critical("Tcp reader server listen %v error: %s", reader.channel[i].addr, err.Error())
			return err
		}
	}

	// fork listen acceptor for oplog transfer tunnel
	nimo.GoRoutineInLoop(func() {
		socket, err := reader.channel[TransferChannel].listener.AcceptTCP()
		if err != nil {
			LOG.Warn("Server accept channel error : %s", err.Error())
			return
		}
		socket.SetNoDelay(false)
		socket.SetLinger(0)
		socket.SetReadBuffer(1024 * 1024 * 16)
		nimo.GoRoutine(func() {
			reader.recvTransfer(socket)
		})
	})

	// fork listen acceptor for ack value query tunnel
	nimo.GoRoutineInLoop(func() {
		socket, err := reader.channel[RecvAckChannel].listener.AcceptTCP()
		if err != nil {
			LOG.Warn("Server ACK accept ch error : %s", err.Error())
			return
		}
		socket.SetNoDelay(true)
		socket.SetLinger(0)
		nimo.GoRoutine(func() {
			reader.recvGetAck(socket)
		})
	})
	return nil
}