func()

in proxy/protocol/dubbo/server/dubbo_conn.go [110:156]


func (this *DubboConnection) MsgRecvLoop() {
	//通知处理应答消息
	for {
		//先处理消息头
		buf := make([]byte, dubbo.HeaderLength)
		size, err := this.conn.Read(buf)
		if err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
				openlog.Error("Dubbo server Recv head: " + err.Error())
				continue
			}
			openlog.Error("Dubbo server Recv head: " + err.Error())
			break
		}

		if size < dubbo.HeaderLength {
			openlog.Info("Invalid msg head")
			continue
		}
		req := new(dubbo.Request)
		bodyLen := 0
		ret := this.codec.DecodeDubboReqHead(req, buf, &bodyLen)
		if ret != dubbo.Success {
			openlog.Info("Invalid msg head")
			continue
		}
		body := make([]byte, bodyLen)
		count := 0
		for {
			redBuff := body[count:]
			size, err = this.conn.Read(redBuff)

			if err != nil {
				//通知关闭连接
				openlog.Error("Recv: " + err.Error())
				goto exitloop
			}
			count += size
			if count == bodyLen {
				break
			}
		}
		this.routineMgr.Spawn(ProcessTask{this, req, body}, nil, fmt.Sprintf("ProcessTask-%d", req.GetMsgID()))
	}
exitloop:
	this.Close()
}