private function requestProcessor()

in provider/fsof/FSOFProtocol.php [179:322]


	private function requestProcessor($client_id, $request)
	{
		//开始执行时间
		$request->startTime = microtime(true);
		//监控请求数量
		$this->server->getAppMonitor()->onRequest($request);

		//设置traceContext, 增加本地的IP地址及APP的端口
		$appConfig = $this->getAppConfig();
		$localIP = FSOFSystemUtil::getLocalIP();
		$appPort = $appConfig['server']['listen'][0];

		$params = $request->__toString();
		if (mb_strlen($params, 'UTF-8') >= 512)
		{
			$params = mb_substr($params, 0, 512, 'UTF-8').' ...';
		}
        $this->logger->debug("in|".$params);

        $businessError = false;
        $frameError = false;
		$result = null;
		//业务处理状态
		$requestFlag = false;
		//返回给客户端执行结果信息
		//$errMsg = 'ok';		//异常信息

		//消息在队列等待时间
		$wait_InQueueTime = 0;
		$inQueueTime = DubboParser::getReqInQueueTime($request);
		if($inQueueTime)
		{
			$wait_InQueueTime = round(microtime(true)*1000000) - $inQueueTime;
		}

		//处理前先检测连接是否仍正常,如己断开则不进行处理
		if(!$this->swoole_server->exist($client_id))
		{
			//执行结束时间
			$request->endTime = microtime(true);
			$cost_time = (int)(($request->endTime - $request->startTime)* 1000000);
			goto END_TCP_CLOSE;
		}

		$status = $this->checkSwooleStatus($request, $localIP, $appPort);
		if(self::FSOF_SWOOLE_STATUS_OK == $status)
		{
			if($this->server->serviceExist($request->getService(),  $request->getGroup(), $request->getVersion()))
			{
				$serviceInstance = $this->server->getServiceInstance($request->getService(), $request->getGroup(), $request->getVersion());
				if (null != $serviceInstance)
				{
					try
					{
						$serviceReflection = new \ReflectionObject($serviceInstance);
						if ($serviceReflection->hasMethod($request->getMethod()))
						{
							$method = $serviceReflection->getmethod($request->getMethod());
							//允许invoke protected方法
							$method->setAccessible(true);
                            $params = $request->getParams();
                            if($params == NULL)
                            {
                                $params = array();
                            }
							$result = $method->invokeArgs($serviceInstance, $params);
							$requestFlag = true;
						}
						else
						{
                            $businessError = true;
							$result = 'function not found:'.$request->getMethod().' in '.$request->getService();
                            $this->logger->error("[{$request->getMethod()}] function not found:".$request->getService());
						}
					}
					catch (\Exception $e)
					{
                        $this->logger->error($e);
                        $frameError = true;
                        $result = $e->getMessage().' in '.$e->getFile().'|'.$e->getLine();
					}

					//如果provider service有状态,则$serviceInstance用完后unset,下次请求重新new, 防止内存泄漏; 对于无状态的service,AppContext会复用$serviceInstance
					if (!$this->server->isStateless())
					{
						unset($serviceInstance);
					}
					unset($method);
					unset($serviceReflection);
				}
				else
				{
                    $frameError = true;
					$result ='get instance failed! | '.$request->getService();
                    $this->logger->error(json_encode($result));
				}
			}
			else
			{
                $frameError = true;
				$result = 'service not found:'.$request->getGroup()."/".$request->getService().":".$request->getVersion();
                $this->logger->error(json_encode($result));
			}
		}
		else
		{
            $frameError = true;
			$result = 'provider过载, 请求消息在队列等待时间超过阀值';
		}

		$request->endTime = microtime(true);//执行结束时间
		$cost_time = (int)(($request->endTime - $request->startTime)* 1000000);
				
		if($this->swoole_server->exist($client_id))
		{
			//发送response
			$response = $this->packResponse($client_id, $request, $result, $businessError,$frameError);
			$msg = $response->__toString();
			if (mb_strlen($msg, 'UTF-8') >= 512)
			{
				$msg = mb_substr($msg, 0, 512, 'UTF-8').' ...('.strlen($msg).')';
			}

            $this->logger->debug(sprintf("out|%s|invokeCostTime:%dus|waitInQueueTime:%dus", $msg, $cost_time, $wait_InQueueTime));
		}
		else
		{
			END_TCP_CLOSE:
			$errMsg = "socket closed by consumer, provider discard response data";
            $this->logger->error("out|{$errMsg}|invokeCostTime:{$cost_time}us| waitInQueueTime:{$wait_InQueueTime}us");
			$requestFlag = false;
		}

		if($requestFlag)
		{
			//监控请求正常处理数量
			$this->server->getAppMonitor()->onResponse($request);
		}
		else
		{
			//监控请求错误处理数量
			$this->server->getAppMonitor()->onError($request);
		}
	}