func()

in proxy/protocol/dubbo/client/client_conn.go [117:165]


func (this *DubboClientConnection) MsgRecvLoop() {
	//通知处理应答消息
	for {
		//先处理消息头
		buf := make([]byte, dubbo.HeaderLength)
		size, err := this.conn.Read(buf)
		if err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
				openlog.Error("client Recv head time err:" + err.Error())
				//time.Sleep(time.Second * 3)
				continue
			}
			openlog.Error("client Recv head err:" + err.Error())
			break
		}

		if size < dubbo.HeaderLength {
			continue
		}
		rsp := new(dubbo.DubboRsp)
		bodyLen := 0
		ret := this.codec.DecodeDubboRsqHead(rsp, buf, &bodyLen)
		if ret != dubbo.Success {
			openlog.Info("Recv DecodeDubboRsqHead failed")
			continue
		}
		body := make([]byte, bodyLen)
		count := 0
		for {
			redBuff := body[count:]
			size, err = this.conn.Read(redBuff)
			if err != nil {
				if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
					continue
				}
				//通知关闭连接
				openlog.Error("Recv client body err:" + err.Error())
				goto exitloop
			}
			count += size
			if count == bodyLen {
				break
			}
		}
		this.routineMgr.Spawn(ProcessTask{this, rsp, body}, nil, fmt.Sprintf("Client ProcessTask-%d", rsp.GetID()))
	}
exitloop:
	this.Close()
}