in session.go [708:778]
func (s *session) handleUDPPackage() error {
var (
ok bool
err error
netError net.Error
conn *gettyUDPConn
bufLen int
maxBufLen int
bufp *[]byte
buf []byte
addr *net.UDPAddr
pkgLen int
pkg interface{}
)
conn = s.Connection.(*gettyUDPConn)
maxBufLen = int(s.maxMsgLen + maxReadBufLen)
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
bufp = gxbytes.AcquireBytes(maxBufLen)
defer gxbytes.ReleaseBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {
break
}
bufLen, addr, err = conn.recv(buf)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, perrors.WithStack(err))
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue
}
if err != nil {
log.Errorf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
s.sessionToken(), bufLen, perrors.WithStack(err))
err = perrors.Wrapf(err, "conn.read()")
break
}
if bufLen == 0 {
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err))
continue
}
if bufLen == len(connectPingPackage) && bytes.Equal(connectPingPackage, buf[:bufLen]) {
log.Infof("got %s connectPingPackage", addr)
continue
}
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
}
if err != nil {
log.Warnf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
s.sessionToken(), pkgLen, perrors.WithStack(err))
continue
}
if pkgLen == 0 {
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
continue
}
s.UpdateActive()
s.addTask(UDPContext{Pkg: pkg, PeerAddr: addr})
}
return perrors.WithStack(err)
}