provider/fsof/FSOFProtocol.php (388 lines of code) (raw):

<?php /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace com\fenqile\fsof\provider\fsof; use com\fenqile\fsof\common\log\FSOFSystemUtil; use com\fenqile\fsof\common\context\FSOFContext; use com\fenqile\fsof\common\protocol\fsof\DubboParser; use com\fenqile\fsof\common\protocol\fsof\DubboRequest; use com\fenqile\fsof\common\protocol\fsof\DubboResponse; use com\fenqile\fsof\provider\core\protocol\BufferedProtocol; class FSOFProtocol extends BufferedProtocol { protected $parser; //缓存各consumer请求包数据,直到请求包数据完整接收到 protected $buffer_header = array(); //定义swoole work 运行状态 const FSOF_SWOOLE_STATUS_OK = 0; const FSOF_SWOOLE_STATUS_OVERLOAD = 1; private $logger; public function init() { $this->logger = \Logger::getLogger(__CLASS__); $this->parser = new DubboParser(); } private function checkHeader($client_id, $fsof_data) { $request = NULL; if (!isset($this->requests[$client_id])) { //新连接 $this->logger->debug("new request from {$client_id}"); if (!empty($this->buffer_header[$client_id])) { $fsof_data = $this->buffer_header[$client_id].$fsof_data; } $this->buffer_header[$client_id] = $fsof_data; //数据长度还不够 if (strlen($fsof_data) < DubboParser::PACKAGE_HEDA_LEN) { return false; } else { unset($this->buffer_header[$client_id]); $request = new DubboRequest(); $request->setFullData($fsof_data); $request = $this->parser->parseRequestHeader($request); //解析失败 if ($request == false) { $this->logger->error("parse request Header fail. fsof_data=" . $fsof_data); return false; } //保存请求 $this->logger->debug("create one request for {$client_id}"); $this->requests[$client_id] = $request; } } else { $this->logger->debug("append request data for {$client_id}"); $request = $this->requests[$client_id]; $request->setFullData($fsof_data); } return $request; } public function checkBuffer($client_id, $data) { //检测头 $request = $this->checkHeader($client_id, $data); //错误的http头 if ($request === false) { if (empty($this->buffer_header[$client_id])) { $this->logger->error("fsof header err."); return self::STATUS_ERROR; } else { $this->logger->debug("wait head data. fd={$client_id}"); return self::STATUS_WAIT; } } if ($request->getRequestLen() <= strlen($request->getFullData())) { if($this->parser->isHearBeatRequest($request)) { //心跳机制 return self::STATUS_FINISH; } else { if ($this->parser->parseRequestBody($request)) { $this->logger->debug("parse request ok!"); return self::STATUS_FINISH; } else { $this->logger->error("fsof body err."); return self::STATUS_ERROR; } } } else { $this->logger->debug("wait body data. fd={$client_id}"); return self::STATUS_WAIT; } } private function checkSwooleStatus($request, $host, $port) { $status = self::FSOF_SWOOLE_STATUS_OK; $appConfig = $this->getAppConfig(); if(false == $appConfig['fsof_setting']['overload_mode']) { return $status; } $reqInQueueTime = DubboParser::getReqInQueueTime($request); if($reqInQueueTime) { $appName = $this->getAppName(); //waiting time inqueue(ms) $waitingTime = $appConfig['fsof_setting']['waiting_time']; //Number of packet overload in a row $overloadNumber = $appConfig['fsof_setting']['overload_number']; $lossNumber = $appConfig['fsof_setting']['loss_number']; $this->logger->debug($host.":".$port.', waitingTime = '.$waitingTime.', overloadNumber = '.$overloadNumber.', lossNumber = '.$lossNumber); $curTimestamp = round(microtime(TRUE)*1000); $reqInQueueTime_ms = round($reqInQueueTime / 1000); if(($curTimestamp - $reqInQueueTime_ms) >= $waitingTime) { //swoole_table记录过载的次数新增1 $this->server->getOverloadMonitor()->overloadIncr(); $this->logger->warn($host.':'.$port.'|' .$appName.'|服务过载|inQueue:'.$reqInQueueTime_ms.'; curTime:'.$curTimestamp.';waitingTime:'.$waitingTime); //判断连续过载次数是否达到开启丢包请求阀值 if ($this->server->getOverloadMonitor()->getoverloadNum() >= $overloadNumber) { //重置过载次数,开启过载丢包模式 $this->server->getOverloadMonitor()->resetOverloadNum_setLossNum($lossNumber); $this->logger->error($host.':'.$port.'|' .$appName.'|服务连续过载,开启丢消息模式'); } $status = self::FSOF_SWOOLE_STATUS_OVERLOAD; } else { //清理swoole_table过载记录数量 $this->server->getOverloadMonitor()->clear(); } } return $status; } private function requestProcessor($client_id, $request) { //开始执行时间 $request->startTime = microtime(true); //监控请求数量 $this->server->getAppMonitor()->onRequest($request); //设置traceContext, 增加本地的IP地址及APP的端口 $appConfig = $this->getAppConfig(); $localIP = FSOFSystemUtil::getLocalIP(); $appPort = $appConfig['server']['listen'][0]; $params = $request->__toString(); if (mb_strlen($params, 'UTF-8') >= 512) { $params = mb_substr($params, 0, 512, 'UTF-8').' ...'; } $this->logger->debug("in|".$params); $businessError = false; $frameError = false; $result = null; //业务处理状态 $requestFlag = false; //返回给客户端执行结果信息 //$errMsg = 'ok'; //异常信息 //消息在队列等待时间 $wait_InQueueTime = 0; $inQueueTime = DubboParser::getReqInQueueTime($request); if($inQueueTime) { $wait_InQueueTime = round(microtime(true)*1000000) - $inQueueTime; } //处理前先检测连接是否仍正常,如己断开则不进行处理 if(!$this->swoole_server->exist($client_id)) { //执行结束时间 $request->endTime = microtime(true); $cost_time = (int)(($request->endTime - $request->startTime)* 1000000); goto END_TCP_CLOSE; } $status = $this->checkSwooleStatus($request, $localIP, $appPort); if(self::FSOF_SWOOLE_STATUS_OK == $status) { if($this->server->serviceExist($request->getService(), $request->getGroup(), $request->getVersion())) { $serviceInstance = $this->server->getServiceInstance($request->getService(), $request->getGroup(), $request->getVersion()); if (null != $serviceInstance) { try { $serviceReflection = new \ReflectionObject($serviceInstance); if ($serviceReflection->hasMethod($request->getMethod())) { $method = $serviceReflection->getmethod($request->getMethod()); //允许invoke protected方法 $method->setAccessible(true); $params = $request->getParams(); if($params == NULL) { $params = array(); } $result = $method->invokeArgs($serviceInstance, $params); $requestFlag = true; } else { $businessError = true; $result = 'function not found:'.$request->getMethod().' in '.$request->getService(); $this->logger->error("[{$request->getMethod()}] function not found:".$request->getService()); } } catch (\Exception $e) { $this->logger->error($e); $frameError = true; $result = $e->getMessage().' in '.$e->getFile().'|'.$e->getLine(); } //如果provider service有状态,则$serviceInstance用完后unset,下次请求重新new, 防止内存泄漏; 对于无状态的service,AppContext会复用$serviceInstance if (!$this->server->isStateless()) { unset($serviceInstance); } unset($method); unset($serviceReflection); } else { $frameError = true; $result ='get instance failed! | '.$request->getService(); $this->logger->error(json_encode($result)); } } else { $frameError = true; $result = 'service not found:'.$request->getGroup()."/".$request->getService().":".$request->getVersion(); $this->logger->error(json_encode($result)); } } else { $frameError = true; $result = 'provider过载, 请求消息在队列等待时间超过阀值'; } $request->endTime = microtime(true);//执行结束时间 $cost_time = (int)(($request->endTime - $request->startTime)* 1000000); if($this->swoole_server->exist($client_id)) { //发送response $response = $this->packResponse($client_id, $request, $result, $businessError,$frameError); $msg = $response->__toString(); if (mb_strlen($msg, 'UTF-8') >= 512) { $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...('.strlen($msg).')'; } $this->logger->debug(sprintf("out|%s|invokeCostTime:%dus|waitInQueueTime:%dus", $msg, $cost_time, $wait_InQueueTime)); } else { END_TCP_CLOSE: $errMsg = "socket closed by consumer, provider discard response data"; $this->logger->error("out|{$errMsg}|invokeCostTime:{$cost_time}us| waitInQueueTime:{$wait_InQueueTime}us"); $requestFlag = false; } if($requestFlag) { //监控请求正常处理数量 $this->server->getAppMonitor()->onResponse($request); } else { //监控请求错误处理数量 $this->server->getAppMonitor()->onError($request); } } public function onOneRequest($client_id, $request) { if($this->parser->isHearBeatRequest($request)) { //心跳请求,不需要数据回送 } else if($this->parser->isNormalRequest($request) || $this->parser->isOneWayRequest($request)) { //获取配置信息 $appConfig = $this->getAppConfig(); //Number of packet loss in a row $lossNumber = $appConfig['fsof_setting']['loss_number']; //判断是否开启过载丢包模式 $restOfLostNum = $this->server->getOverloadMonitor()->getLossNum(); if($restOfLostNum > 0) { // 加入对过载丢失数据的统计 $this->server->getAppMonitor()->onRequest($request); //回复客户端 if($restOfLostNum >= $lossNumber) { if($this->swoole_server->exist($client_id)) { $result = 'provider连续过载, 开启丢消息模式'; $this->packResponse($client_id, $request, $result, false, true); } } else { if($this->swoole_server->exist($client_id)) { $result = 'provider开启丢消息模式, 连续丢包中...'; $this->packResponse($client_id, $request, $result, false, true); } } //递减丢包数量 $this->server->getOverloadMonitor()->lossNumDecr(); //监控请求错误处理数量 $this->server->getAppMonitor()->onError($request); } else { $this->requestProcessor($client_id,$request); } } else { $this->logger->error("invalid request = $request"); } } public function packResponse($client_id, $request, $data, $businessError,$frameError) { $response = new DubboResponse(); $response->setSn($request->getSn()); if($frameError){ $response->setStatus(DubboResponse::SERVICE_ERROR); $response->setErrorMsg($data); } if($businessError){ $response->setErrorMsg($data); } $response->setResult($data); $this->sendResponse($response, $client_id); return $response; } public function sendResponse(DubboResponse $response, $client_id) { try { $send_data = $this->parser->packResponse($response); $send_len = strlen($send_data); //默认所有server的response最大5M,每个分包允许重发2次,预计最多10次循环,防止网络出错导致server直接挂死在循环中 $cnt = (($send_len / DubboParser::RESPONSE_TCP_SEGMENT_LEN) + 1) * 2; $tmp_len = $send_len; for ($i = 0; $i < $cnt; $i++) { if ($tmp_len > DubboParser::RESPONSE_TCP_SEGMENT_LEN) { //大于1M 分段发送 $tmp_data = substr($send_data, 0, DubboParser::RESPONSE_TCP_SEGMENT_LEN); if ($this->swoole_server->send($client_id, $tmp_data)) { $tmp_len -= DubboParser::RESPONSE_TCP_SEGMENT_LEN; $send_data = substr($send_data, DubboParser::RESPONSE_TCP_SEGMENT_LEN); } else { if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { $last_error_no = $this->swoole_server->errno(); } else { $last_error_no = swoole_errno(); } if (0 == $last_error_no) { //表示该连接己关闭 $this->logger->error("当前连接己关闭,发送失败"); break; } else { $this->logger->error('send response split package fail one time!'); } } $this->logger->warn('the length of response: '.$send_len.'; send split package '.$i.'/'.$cnt); } else { //小于1M一次性发完 if ($this->swoole_server->send($client_id, $send_data)) { break; } else { if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { $last_error_no = $this->swoole_server->errno(); } else { $last_error_no = swoole_errno(); } if (0 == $last_error_no) { //表示该连接己关闭 $this->logger->error("当前连接己关闭,发送失败"); break; } else { $this->logger->error('send response last package fail one time!'); } } } } } catch (\Exception $e) { $this->logger->error($e->getMessage(), $e); } return $send_len; } }