in provider/fsof/FSOFProtocol.php [179:322]
private function requestProcessor($client_id, $request)
{
//开始执行时间
$request->startTime = microtime(true);
//监控请求数量
$this->server->getAppMonitor()->onRequest($request);
//设置traceContext, 增加本地的IP地址及APP的端口
$appConfig = $this->getAppConfig();
$localIP = FSOFSystemUtil::getLocalIP();
$appPort = $appConfig['server']['listen'][0];
$params = $request->__toString();
if (mb_strlen($params, 'UTF-8') >= 512)
{
$params = mb_substr($params, 0, 512, 'UTF-8').' ...';
}
$this->logger->debug("in|".$params);
$businessError = false;
$frameError = false;
$result = null;
//业务处理状态
$requestFlag = false;
//返回给客户端执行结果信息
//$errMsg = 'ok'; //异常信息
//消息在队列等待时间
$wait_InQueueTime = 0;
$inQueueTime = DubboParser::getReqInQueueTime($request);
if($inQueueTime)
{
$wait_InQueueTime = round(microtime(true)*1000000) - $inQueueTime;
}
//处理前先检测连接是否仍正常,如己断开则不进行处理
if(!$this->swoole_server->exist($client_id))
{
//执行结束时间
$request->endTime = microtime(true);
$cost_time = (int)(($request->endTime - $request->startTime)* 1000000);
goto END_TCP_CLOSE;
}
$status = $this->checkSwooleStatus($request, $localIP, $appPort);
if(self::FSOF_SWOOLE_STATUS_OK == $status)
{
if($this->server->serviceExist($request->getService(), $request->getGroup(), $request->getVersion()))
{
$serviceInstance = $this->server->getServiceInstance($request->getService(), $request->getGroup(), $request->getVersion());
if (null != $serviceInstance)
{
try
{
$serviceReflection = new \ReflectionObject($serviceInstance);
if ($serviceReflection->hasMethod($request->getMethod()))
{
$method = $serviceReflection->getmethod($request->getMethod());
//允许invoke protected方法
$method->setAccessible(true);
$params = $request->getParams();
if($params == NULL)
{
$params = array();
}
$result = $method->invokeArgs($serviceInstance, $params);
$requestFlag = true;
}
else
{
$businessError = true;
$result = 'function not found:'.$request->getMethod().' in '.$request->getService();
$this->logger->error("[{$request->getMethod()}] function not found:".$request->getService());
}
}
catch (\Exception $e)
{
$this->logger->error($e);
$frameError = true;
$result = $e->getMessage().' in '.$e->getFile().'|'.$e->getLine();
}
//如果provider service有状态,则$serviceInstance用完后unset,下次请求重新new, 防止内存泄漏; 对于无状态的service,AppContext会复用$serviceInstance
if (!$this->server->isStateless())
{
unset($serviceInstance);
}
unset($method);
unset($serviceReflection);
}
else
{
$frameError = true;
$result ='get instance failed! | '.$request->getService();
$this->logger->error(json_encode($result));
}
}
else
{
$frameError = true;
$result = 'service not found:'.$request->getGroup()."/".$request->getService().":".$request->getVersion();
$this->logger->error(json_encode($result));
}
}
else
{
$frameError = true;
$result = 'provider过载, 请求消息在队列等待时间超过阀值';
}
$request->endTime = microtime(true);//执行结束时间
$cost_time = (int)(($request->endTime - $request->startTime)* 1000000);
if($this->swoole_server->exist($client_id))
{
//发送response
$response = $this->packResponse($client_id, $request, $result, $businessError,$frameError);
$msg = $response->__toString();
if (mb_strlen($msg, 'UTF-8') >= 512)
{
$msg = mb_substr($msg, 0, 512, 'UTF-8').' ...('.strlen($msg).')';
}
$this->logger->debug(sprintf("out|%s|invokeCostTime:%dus|waitInQueueTime:%dus", $msg, $cost_time, $wait_InQueueTime));
}
else
{
END_TCP_CLOSE:
$errMsg = "socket closed by consumer, provider discard response data";
$this->logger->error("out|{$errMsg}|invokeCostTime:{$cost_time}us| waitInQueueTime:{$wait_InQueueTime}us");
$requestFlag = false;
}
if($requestFlag)
{
//监控请求正常处理数量
$this->server->getAppMonitor()->onResponse($request);
}
else
{
//监控请求错误处理数量
$this->server->getAppMonitor()->onError($request);
}
}