in proxy/protocol/dubbo/server/dubbo_conn.go [110:156]
func (this *DubboConnection) MsgRecvLoop() {
//通知处理应答消息
for {
//先处理消息头
buf := make([]byte, dubbo.HeaderLength)
size, err := this.conn.Read(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
openlog.Error("Dubbo server Recv head: " + err.Error())
continue
}
openlog.Error("Dubbo server Recv head: " + err.Error())
break
}
if size < dubbo.HeaderLength {
openlog.Info("Invalid msg head")
continue
}
req := new(dubbo.Request)
bodyLen := 0
ret := this.codec.DecodeDubboReqHead(req, buf, &bodyLen)
if ret != dubbo.Success {
openlog.Info("Invalid msg head")
continue
}
body := make([]byte, bodyLen)
count := 0
for {
redBuff := body[count:]
size, err = this.conn.Read(redBuff)
if err != nil {
//通知关闭连接
openlog.Error("Recv: " + err.Error())
goto exitloop
}
count += size
if count == bodyLen {
break
}
}
this.routineMgr.Spawn(ProcessTask{this, req, body}, nil, fmt.Sprintf("ProcessTask-%d", req.GetMsgID()))
}
exitloop:
this.Close()
}