in proxy/protocol/dubbo/client/client_conn.go [117:165]
func (this *DubboClientConnection) MsgRecvLoop() {
//通知处理应答消息
for {
//先处理消息头
buf := make([]byte, dubbo.HeaderLength)
size, err := this.conn.Read(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
openlog.Error("client Recv head time err:" + err.Error())
//time.Sleep(time.Second * 3)
continue
}
openlog.Error("client Recv head err:" + err.Error())
break
}
if size < dubbo.HeaderLength {
continue
}
rsp := new(dubbo.DubboRsp)
bodyLen := 0
ret := this.codec.DecodeDubboRsqHead(rsp, buf, &bodyLen)
if ret != dubbo.Success {
openlog.Info("Recv DecodeDubboRsqHead failed")
continue
}
body := make([]byte, bodyLen)
count := 0
for {
redBuff := body[count:]
size, err = this.conn.Read(redBuff)
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
continue
}
//通知关闭连接
openlog.Error("Recv client body err:" + err.Error())
goto exitloop
}
count += size
if count == bodyLen {
break
}
}
this.routineMgr.Spawn(ProcessTask{this, rsp, body}, nil, fmt.Sprintf("Client ProcessTask-%d", rsp.GetID()))
}
exitloop:
this.Close()
}