in consumer/fsof/FSOFProcessor.php [206:304]
protected function recvDataFromProvider($socket, DubboRequest $request)
{
$fsof_data = $this->Recv($socket, DubboParser::PACKAGE_HEDA_LEN);
if (!$fsof_data)
{
if (0 == $socket->getlasterror())
{
throw new ConsumerException("provider端己关闭网络连接");
}
else
{
throw new ConsumerException("接收应答数据超时");
}
}
//解析头
$response = new DubboResponse();
$response->setFullData($fsof_data);
$response = $this->parser->parseResponseHeader($response);
if (($response) && ($response->getSn() != $request->getSn()))
{
$this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]");
throw new ConsumerException("请求包中的sn非法");
}
//接收消息体
$resData = substr($response->getFullData(), DubboParser::PACKAGE_HEDA_LEN);
if ($resData)
{
$resDataLen = strlen($resData);
}
else
{
$resDataLen = 0;
}
if ($resDataLen < $response->getLen())
{
//取到微秒
$start_time = microtime(true);
//如果长度超过1M,则分包处理,以1M为单位分包
$resv_len = $response->getLen() - $resDataLen;
$cur_len = 0;
$recv_data = '';
do
{
if (DubboParser::MAX_RECV_LEN > $resv_len)
{
$cur_len = $resv_len;
}
else
{
$cur_len = DubboParser::MAX_RECV_LEN;
}
$tmpdata = $this->Recv($socket, $cur_len);
if ($tmpdata)
{
$recv_data .= $tmpdata;
$resv_len -= $cur_len;
}
else
{
if (0 == $socket->getlasterror())
{
throw new ConsumerException("provider端己关闭网络连接");
}
else
{
throw new ConsumerException("接收应答数据超时");
}
}
//如果超过设置的iotimeout就当超时处理
if ((microtime(true) - $start_time) > $this->iotimeout)
{
$this->logger->error("Multi recv {$resv_len} bytes data timeout");
throw new ConsumerException("接收应答数据超时");
}
} while ($resv_len > 0);
$response->setFullData($response->getFullData() . $recv_data);
}
if ($this->parser->parseResponseBody($response))
{
if(DubboResponse::OK != $response->getStatus())
{
throw new ConsumerException($response->getErrorMsg());
}
else
{
return $response->getResult();
}
}
else
{
$this->logger->error("parse response body err:".$response->__toString());
throw new ConsumerException("未知异常");
}
}