consumer/fsof/FSOFProcessor.php (287 lines of code) (raw):

<?php /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace com\fenqile\fsof\consumer\fsof; use com\fenqile\fsof\common\config\FSOFConstants; use com\fenqile\fsof\common\protocol\fsof\DubboParser; use com\fenqile\fsof\common\protocol\fsof\DubboRequest; use com\fenqile\fsof\common\protocol\fsof\DubboResponse; use com\fenqile\fsof\consumer\client\FSOFClient4Linux; use com\fenqile\fsof\consumer\ConsumerException; class FSOFProcessor { const FSOF_CONNECTION_RESET = 104; const FSOF_ETIMEOUT = 110; const FSOF_EINPROGRESS = 115; const FSOF_ECONNREFUSED = 111; protected $parser; private $logger; private $iotimeout = 3; public function __construct() { $this->logger = \Logger::getLogger(__CLASS__); $this->parser = new DubboParser(); } public function executeRequest(DubboRequest $request, $svrAddr, $ioTimeOut, &$providerAddr) { $this->iotimeout = $ioTimeOut; //计算服务端个数 $svrNum = count($svrAddr); //连接异常重试次数最多2次 $connect_try_times = ($svrNum > 2) ? 2 : $svrNum; $client = NULL; for ($i = 0; $i < $connect_try_times; $i++) { try { //获取路由下标 $col = mt_rand(0, $svrNum-1); $svrUrl = $svrAddr[$col]; $host = $svrUrl->getHost(); $port = $svrUrl->getPort(); //记录路由信息 $providerAddr = $host.':'.$port; //透传到服务端字段 $request->host = $host; $request->port = $port; $request->setGroup($svrUrl->getGroup()); $request->setVersion( $svrUrl->getVersion()); $request->setTimeout($this->iotimeout * 1000); $request->setSerialization($svrUrl->getSerialization(DubboParser::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON)); $client = $this->connectProvider($host, $port, $this->iotimeout); if(empty($client)) { //记录连接错误日志 $this->logger->error("connect FSOF server[".$host.":".$port ."] failed"); //删除无用地址信息 $svrAddr[$col] = NULL; $svrAddr = array_filter($svrAddr); if(self::FSOF_ECONNREFUSED == $this->lastErrorNo) { //连接拒绝 continue; } else if(self::FSOF_ETIMEOUT == $this->lastErrorNo || self::FSOF_EINPROGRESS == $this->lastErrorNo) { //连接超时 break; } else { //其他错误 continue; } } else { break; } } catch (\Exception $e) { if (!empty($client)) { unset($client); } $this->logger->error($e->getMessage(), $e); } } //与服务端进行交互 $ret = NULL; if(isset($client)) { try { $data = $this->parser->packRequest($request); $dataLen = strlen($data); if(!$client->send($data, $dataLen)) { $client->close(true); unset($client); $msg = json_encode($request->__toString(), JSON_UNESCAPED_UNICODE); if (mb_strlen($msg, 'UTF-8') >= 512) { $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")"; } $this->logger->error("send date failed:" . $msg); throw new ConsumerException("发送请求数据失败"); } } catch (\Exception $e) { $client->close(true); unset($client); $msg = json_encode($request->__toString(), JSON_UNESCAPED_UNICODE); if (mb_strlen($msg, 'UTF-8') >= 512) { $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")"; } $this->logger->error("send date failed:" . $msg, $e); throw new ConsumerException("发送请求数据失败"); } try { $ret = $this->recvDataFromProvider($client, $request); $client->close(); unset($client); } catch (\Exception $e) { $client->close(true); unset($client); throw $e; } } else { throw new ConsumerException("与服务器建立连接失败"); } return $ret; } protected function connectProvider($host, $port, $iotimeout) { try { $start_time = microtime(true);//取到微秒 $client = new FSOFClient4Linux(); if ($client->connect($host,$port,$iotimeout)) { $this->logger->debug('connect to server['.$host.":".$port."] success,timeout:".$iotimeout); } else { //记录错误码 $this->lastErrorNo = $client->getlasterror(); $cost_time = (int)((microtime(true) - $start_time) * 1000000); $this->logger->error('connect to server['.$host.":".$port."] failed,timeout:".$iotimeout."|".$cost_time."us".'|errcode:'.$this->lastErrorNo); unset($client); } } catch (\Exception $e) { unset($client); $this->logger->error("Connect provider exception:",$e); } if (isset($client)) { return $client; } else { return NULL; } } protected function recvDataFromProvider($socket, DubboRequest $request) { $fsof_data = $this->Recv($socket, DubboParser::PACKAGE_HEDA_LEN); if (!$fsof_data) { if (0 == $socket->getlasterror()) { throw new ConsumerException("provider端己关闭网络连接"); } else { throw new ConsumerException("接收应答数据超时"); } } //解析头 $response = new DubboResponse(); $response->setFullData($fsof_data); $response = $this->parser->parseResponseHeader($response); if (($response) && ($response->getSn() != $request->getSn())) { $this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]"); throw new ConsumerException("请求包中的sn非法"); } //接收消息体 $resData = substr($response->getFullData(), DubboParser::PACKAGE_HEDA_LEN); if ($resData) { $resDataLen = strlen($resData); } else { $resDataLen = 0; } if ($resDataLen < $response->getLen()) { //取到微秒 $start_time = microtime(true); //如果长度超过1M,则分包处理,以1M为单位分包 $resv_len = $response->getLen() - $resDataLen; $cur_len = 0; $recv_data = ''; do { if (DubboParser::MAX_RECV_LEN > $resv_len) { $cur_len = $resv_len; } else { $cur_len = DubboParser::MAX_RECV_LEN; } $tmpdata = $this->Recv($socket, $cur_len); if ($tmpdata) { $recv_data .= $tmpdata; $resv_len -= $cur_len; } else { if (0 == $socket->getlasterror()) { throw new ConsumerException("provider端己关闭网络连接"); } else { throw new ConsumerException("接收应答数据超时"); } } //如果超过设置的iotimeout就当超时处理 if ((microtime(true) - $start_time) > $this->iotimeout) { $this->logger->error("Multi recv {$resv_len} bytes data timeout"); throw new ConsumerException("接收应答数据超时"); } } while ($resv_len > 0); $response->setFullData($response->getFullData() . $recv_data); } if ($this->parser->parseResponseBody($response)) { if(DubboResponse::OK != $response->getStatus()) { throw new ConsumerException($response->getErrorMsg()); } else { return $response->getResult(); } } else { $this->logger->error("parse response body err:".$response->__toString()); throw new ConsumerException("未知异常"); } } protected function Recv($socket, $len) { try { $start_time = microtime(true); $resv_len = $len; $_data = ''; do { $tmp_data = $socket->recv($resv_len); if (!$tmp_data) { $this->logger->warn("socket->recv faile:$resv_len"); break; } $_data .= $tmp_data; $resv_len -= strlen($tmp_data); } while (($resv_len > 0) && ( (microtime(true) - $start_time) < $this->iotimeout)); //读取数据不能超过设置的io时长 if ($resv_len > 0) { $this->logger->error("Recv $len data fail!"); return FALSE; } return $_data; } catch (\Exception $e) { $this->logger->error('recv data exception',$e); if(self::FSOF_CONNECTION_RESET == $e->getCode()) { throw new ConsumerException("未知异常"); } else { throw new ConsumerException("接收应答数据超时"); } } } }