protected function recvDataFromProvider()

in consumer/fsof/FSOFProcessor.php [206:304]


    protected function recvDataFromProvider($socket, DubboRequest $request)
    {
        $fsof_data = $this->Recv($socket, DubboParser::PACKAGE_HEDA_LEN);
        if (!$fsof_data)
        {
            if (0 == $socket->getlasterror())
            {
                throw new ConsumerException("provider端己关闭网络连接");
            }
            else
            {
                throw new ConsumerException("接收应答数据超时");
            }
        }

        //解析头
        $response = new DubboResponse();
        $response->setFullData($fsof_data);
        $response = $this->parser->parseResponseHeader($response);
        if (($response) && ($response->getSn() != $request->getSn()))
        {
            $this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]");
            throw new ConsumerException("请求包中的sn非法");
        }

        //接收消息体
        $resData = substr($response->getFullData(), DubboParser::PACKAGE_HEDA_LEN);
        if ($resData)
        {
            $resDataLen = strlen($resData);
        }
        else
        {
            $resDataLen = 0;
        }

        if ($resDataLen < $response->getLen())
        {
            //取到微秒
            $start_time = microtime(true);
            //如果长度超过1M,则分包处理,以1M为单位分包
            $resv_len = $response->getLen() - $resDataLen;
            $cur_len = 0;
            $recv_data = '';
            do
            {
                if (DubboParser::MAX_RECV_LEN > $resv_len)
                {
                    $cur_len = $resv_len;
                }
                else
                {
                    $cur_len = DubboParser::MAX_RECV_LEN;
                }
                $tmpdata = $this->Recv($socket, $cur_len);
                if ($tmpdata)
                {
                    $recv_data .= $tmpdata;
                    $resv_len -= $cur_len;
                }
                else
                {
                    if (0 == $socket->getlasterror())
                    {
                        throw new ConsumerException("provider端己关闭网络连接");
                    }
                    else
                    {
                        throw new ConsumerException("接收应答数据超时");
                    }
                }
                //如果超过设置的iotimeout就当超时处理
                if ((microtime(true) - $start_time) > $this->iotimeout)
                {
                    $this->logger->error("Multi recv {$resv_len} bytes data timeout");
                    throw new ConsumerException("接收应答数据超时");
                }
            } while ($resv_len > 0);

            $response->setFullData($response->getFullData() . $recv_data);
        }

        if ($this->parser->parseResponseBody($response))
        {
            if(DubboResponse::OK != $response->getStatus())
            {
                throw new ConsumerException($response->getErrorMsg());
            }
            else
            {
                return $response->getResult();
            }
        }
        else
        {
            $this->logger->error("parse response body err:".$response->__toString());
            throw new ConsumerException("未知异常");
        }
    }