in session.go [617:705]
func (s *session) handleTCPPackage() error {
var (
ok bool
err error
netError net.Error
conn *gettyTCPConn
exit bool
bufLen int
pkgLen int
buf []byte
pktBuf *gxbytes.Buffer
pkg interface{}
)
pktBuf = gxbytes.NewBuffer(nil)
conn = s.Connection.(*gettyTCPConn)
for {
if s.IsClosed() {
err = nil
// do not handle the left stream in pktBuf and exit asap.
// it is impossible packing a package by the left stream.
break
}
bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
buf = pktBuf.WriteNextBegin(maxReadBufLen)
bufLen, err = conn.recv(buf)
if err != nil {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break
}
if perrors.Cause(err) == io.EOF {
log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
err = nil
exit = true
if bufLen != 0 {
// as https://github.com/apache/dubbo-getty/issues/77#issuecomment-939652203
// this branch is impossible. Even if it happens, the bufLen will be zero and the error
// is io.EOF when getty continues to read the socket.
exit = false
log.Infof("%s, session.conn read EOF, while the bufLen(%d) is non-zero.", s.sessionToken())
}
break
}
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
exit = true
}
break
}
if 0 != bufLen {
pktBuf.WriteNextEnd(bufLen)
for {
if pktBuf.Len() <= 0 {
break
}
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// for case 3/case 4
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen)
}
// handle case 1
if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, perrors.WithStack(err))
exit = true
break
}
// handle case 2/case 3
if pkg == nil {
break
}
// handle case 4
s.UpdateActive()
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
}
if exit {
break
}
}
return perrors.WithStack(err)
}