tunnel/tcp_reader.go (121 lines of code) (raw):
package tunnel
import (
"bytes"
"encoding/binary"
"io"
"net"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
type TCPReader struct {
// listen
listenAddress string
// for golang tcp socket
channel [2]*ListenSocket
replayer []Replayer
ack int64
}
type ListenSocket struct {
addr *net.TCPAddr
listener *net.TCPListener
}
func (reader *TCPReader) Link(replayer []Replayer) (err error) {
reader.replayer = replayer
for i := 0; i != TotalQueueNum; i++ {
reader.channel[i] = new(ListenSocket)
reader.channel[i].addr, err = net.ResolveTCPAddr("tcp4", reader.listenAddress)
if err != nil {
LOG.Critical("Resolve channel listenAddress error: %s", err.Error())
return err
}
}
reader.channel[RecvAckChannel].addr.Port = reader.channel[RecvAckChannel].addr.Port + 1
for i := 0; i != TotalQueueNum; i++ {
reader.channel[i].listener, err = net.ListenTCP("tcp", reader.channel[i].addr)
if err != nil {
LOG.Critical("Tcp reader server listen %v error: %s", reader.channel[i].addr, err.Error())
return err
}
}
// fork listen acceptor for oplog transfer tunnel
nimo.GoRoutineInLoop(func() {
socket, err := reader.channel[TransferChannel].listener.AcceptTCP()
if err != nil {
LOG.Warn("Server accept channel error : %s", err.Error())
return
}
socket.SetNoDelay(false)
socket.SetLinger(0)
socket.SetReadBuffer(1024 * 1024 * 16)
nimo.GoRoutine(func() {
reader.recvTransfer(socket)
})
})
// fork listen acceptor for ack value query tunnel
nimo.GoRoutineInLoop(func() {
socket, err := reader.channel[RecvAckChannel].listener.AcceptTCP()
if err != nil {
LOG.Warn("Server ACK accept ch error : %s", err.Error())
return
}
socket.SetNoDelay(true)
socket.SetLinger(0)
nimo.GoRoutine(func() {
reader.recvGetAck(socket)
})
})
return nil
}
func (reader *TCPReader) recvTransfer(socket *net.TCPConn) {
defer socket.Close()
// every entire packet just for one loop time
header := [HeaderLen]byte{}
for {
socketTimeout(socket, NetworkDefaultTimeout*10)
// read util entire header
if _, err := io.ReadAtLeast(socket, header[:], HeaderLen); err != nil {
LOG.Warn("Server transfer read header at least failed readAtLeast %d, %s",
HeaderLen, err.Error())
return
}
packet := NewPacketV1(PacketIncomplete, nil)
if !packet.decodeHeader(header[:]) {
LOG.Warn("Server transfer decode header failed")
return
}
nimo.AssertTrue(packet.typeOf == PacketWrite && packet.length != 0, "transfer receive bad type packet")
payload := make([]byte, packet.length)
if _, err := io.ReadAtLeast(socket, payload, int(packet.length)); err != nil {
LOG.Warn("Server transfer read packet at least failed readAtLeast %d, %s",
packet.length, err.Error())
return
}
message := new(TMessage)
message.FromBytes(payload, binary.BigEndian)
// hash corresponding replayer and re-sharding
if message.Shard >= uint32(len(reader.replayer)) {
message.Shard %= uint32(len(reader.replayer))
}
reader.ack = reader.replayer[message.Shard].Sync(message, nil)
}
}
func (reader *TCPReader) recvGetAck(socket *net.TCPConn) {
defer socket.Close()
// every entire packet just for one loop time
header := [HeaderLen]byte{}
for {
socketTimeout(socket, NetworkDefaultTimeout)
// read util entire header
if _, err := io.ReadAtLeast(socket, header[:], HeaderLen); err != nil {
LOG.Warn("Server ack read header at least failed readAtLeast %d, %s",
HeaderLen, err.Error())
return
}
packet := NewPacketV1(PacketIncomplete, nil)
if !packet.decodeHeader(header[:]) {
LOG.Warn("Server ack decode header failed")
return
}
nimo.AssertTrue(packet.typeOf == PacketGetACK && packet.length == 0, "ack receive bad type packet")
// write back ack
buffer := &bytes.Buffer{}
binary.Write(buffer, binary.BigEndian, reader.ack)
packet = NewPacketV1(PacketReturnACK, buffer.Bytes())
if _, err := socket.Write(packet.encode()); err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
LOG.Warn("Tcp ack send ack back timeout")
}
}
nimo.AssertTrue(packet.length != 0, "ack send bad PacketReturnACK packet len")
}
}