provider/core/server/FSOFRegistry.php (298 lines of code) (raw):

<?php /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace com\fenqile\fsof\provider\core\server; use com\fenqile\fsof\common\url\FSOFUrl; use com\fenqile\fsof\common\file\FSOFRedis; use com\fenqile\fsof\common\log\FSOFSystemUtil; use com\fenqile\fsof\common\config\FSOFConstants; use com\fenqile\fsof\registry\automatic\RegistryServiceFactory; class FSOFRegistry { //设置zookeeper的日志文件及日志级别(1.error; 2.warn; 3.info; 4.debug) const ZOOKEEPER_LOG_NO = 0; const ZOOKEEPER_LOG_ERROR = 1; const ZOOKEEPER_LOG_WARN = 2; const ZOOKEEPER_LOG_INFO = 3; const ZOOKEEPER_LOG_DEBUG = 4; protected $appName; protected $port; protected $config; protected $serverProviders; protected $localIp = '127.0.0.1'; protected $ephemeral = false; //zookeeper相关 protected $zkService = null; protected $fsofUrlList = array(); protected $start_without_registry; private static $_instance; private $logger; public static function instance() { if (empty(FSOFRegistry::$_instance)) { FSOFRegistry::$_instance = new FSOFRegistry(); } return FSOFRegistry::$_instance; } public function __construct() { $this->logger = \Logger::getLogger(__CLASS__); } public function setParams($appName, $appConfig, $port, $serverProviders, $registry = true) { $this->logger->info("serverProviders:".json_encode($serverProviders)); $this->appName = $appName; $this->port = $port; $this->config = $appConfig; $this->serverProviders = $serverProviders; $this->start_without_registry = $registry; try { $this->localIp = FSOFSystemUtil::getServiceIP(); } catch (\Exception $e) { $this->logger->error('The server network configuration errors',$e); //当获取IP失败时,禁止往zk注册 $this->start_without_registry = true; } } protected function createZookeeperService() { $ret = false; if(isset($this->config['fsof_setting']['zk_url_list'])) { try { $zkUrlList = $this->config['fsof_setting']['zk_url_list']; $zkUrlArr = explode(',', $this->config['fsof_setting']['zk_url_list']); $registryUrl = array(); foreach ($zkUrlArr as $zkUrl) { $url = new FSOFUrl($zkUrl); $registryUrl[] = $url; } //创建与zookeeper连接用来上报和注销service信息 $this->zkService = RegistryServiceFactory::getRegistry($registryUrl); //动态通过回调进行注册 $this->zkService->registerCallFunc(array($this,'watcherCallFunc')); //连接zookeeper $ret = $this->zkService->connectZk($this->ephemeral); if($ret == false) { //重新连接一次 $ret = $this->zkService->connectZk($this->ephemeral); } if($ret == false) { $this->logger->error('connect zookeeper failed|app:' . $this->appName . '|zkurl:' . $zkUrlList); } } catch (\Exception $e) { $this->logger->error('connect zookeeper failed|app:'.$e->getMessage(),$e); } } return $ret; } protected function ServiceSerialize() { try { unset($this->fsofUrlList); if (!empty($this->serverProviders)) { if ($this->ephemeral) { //注册时间 //$this->config["service_properties"]["timestamp"] = (int)(microtime(true) * 1000); $this->config["service_properties"]["dynamic"] = "true"; } else { $this->config["service_properties"]["dynamic"] = "false"; } $services = $this->serverProviders; foreach ($services as $interface => $serviceInfo) { //合并全局配置 if (isset($this->config["service_properties"])) { //接口配置优先级高于全局配置,所以$serviceInfo放后面 $serviceInfo = array_merge($this->config["service_properties"], $serviceInfo); } //不用上报的信息去掉 unset($serviceInfo['service']); unset($serviceInfo['p2p_mode']); if (empty($serviceInfo["version"])) { $serviceInfo['version'] = FSOFConstants::FSOF_SERVICE_VERSION_DEFAULT; } //与dubbo兼容处理 $serviceInfo['interface'] = $interface;//dubbo_admin需要使用 //序列化方式 $serviceInfo['serialization']= "fastjson"; ksort($serviceInfo);//参数排序,与dubbo兼容 $urlPara = array( 'scheme' => 'dubbo', 'host' => $this->localIp, 'port' => $this->port, 'path' => '/' . $interface, //http_build_query会进行urlencode导致query参数被多编码一次,使用urldecode抵消 'query' => urldecode(http_build_query($serviceInfo)), ); $this->logger->debug("serviceInfo:" . json_encode($serviceInfo) . "|urlPara:" . json_encode($urlPara)); try { $fsofUrl = new FSOFUrl($urlPara); } catch (\Exception $e) { $this->logger->error('init url failed|app:' . $this->appName . '|urlPara:' . json_encode($urlPara)); } $this->fsofUrlList[] = $fsofUrl; } } } catch(\Exception $e) { $errMsg = $e->getMessage(); $this->logger->error('ServiceSerialize:'.$errMsg, $e); } } protected function registerServiceToZk() { $ret = false; if(!empty($this->fsofUrlList)) { foreach($this->fsofUrlList as $fsofUrl) { try { $ret = $this->zkService->register($fsofUrl); } catch(\Exception $e) { $ret = false; $errMsg = $e->getMessage(); $this->logger->error('register|app:'.$this->appName.'|url:'.$fsofUrl->getOriginUrl().'|path:'.$fsofUrl->getZookeeperPath().'|errMsg:'.$errMsg); } } } return $ret; } protected function unRegisterServiceFromZk() { $ret = false; if(!empty($this->fsofUrlList)) { foreach($this->fsofUrlList as $fsofUrl) { try { //清理服务注册数据 $ret = $this->zkService->unregister($fsofUrl); //清理服务脏数据 $host = $fsofUrl->getHost(); $port = $fsofUrl->getPort(); $service = $fsofUrl->getService(); $language = $fsofUrl->getParams('language'); $application = $fsofUrl->getParams('application'); $providerInfo = FSOFRedis::instance()->getProviderInfo($service); if(is_array($providerInfo)) { foreach($providerInfo as $index => $url) { $urlObj = new FSOFUrl($url); if(!empty($urlObj)) { if($service == $urlObj->getService() && $host == $urlObj->getHost() && $port == $urlObj->getPort() && $application == $urlObj->getParams('application') && $language == $urlObj->getParams('language')) { $ret = $this->zkService->unregister($urlObj); $this->logger->info('unRegister|app:'.$this->appName.'|url:'.$urlObj->getOriginUrl()); } } } } } catch(\Exception $e) { $ret = false; $errMsg = $e->getMessage(); $this->logger->error('unRegister|app:'.$this->appName.'|url:'.$fsofUrl->getOriginUrl().'|path:'.$fsofUrl->getZookeeperPath().'|errMsg:'.$errMsg); } } //断开redis连接 FSOFRedis::instance()->close(); } return $ret; } protected function setWatcherCallFunc() { $this->zkService->registerCallFunc(array($this,'watcherCallFunc')); } protected function setZkLog() { //设置zookeeper日志 $zkLog_level = isset($this->config['fsof_setting']['zklog_level'])?$this->config['fsof_setting']['zklog_level']:self::ZOOKEEPER_LOG_NO; if ($zkLog_level > self::ZOOKEEPER_LOG_NO) { //开启zookeeper日志输出开关 if ($zkLog_level > self::ZOOKEEPER_LOG_DEBUG) { $zkLog_level = self::ZOOKEEPER_LOG_DEBUG; } //设置zookeeper的日志文件及日志级别(1.error; 2.warn; 3.info; 4.debug) if(isset($this->config['fsof_setting']['zklog_path']) && !empty($this->config['fsof_setting']['zklog_path'])){ $this->zkService->setLogFile($this->config['fsof_setting']['zklog_path'], $zkLog_level); }else{ $this->zkService->setLogFile("/var/fsof/provider/zookeeper.log", $zkLog_level); } } } protected function inventZkService() { unset($this->zkService); } /* * 提供给外部调用函数 * */ public function registerZk() { if(!$this->start_without_registry) { try { $this->logger->info("init zk start..."); //生成service urls $this->ServiceSerialize(); $this->createZookeeperService(); $this->setZkLog(); if (!$this->ephemeral) { //静态注册模式 $this->registerServiceToZk(); $this->inventZkService(); } //连接成功后,通过注入watcherCallFunc函数进行注册 $this->logger->info("init zk end..."); } catch (\Exception $e) { $this->logger->error($e->getMessage(),$e); } } } public function unRegisterZk() { if(!$this->start_without_registry) { try { if (!$this->ephemeral) { //静态注册模式重新连接 $this->createZookeeperService(); } $this->unRegisterServiceFromZk(); $this->inventZkService(); $this->logger->info("unRegisterZk"); } catch (\Exception $e) { $this->logger->error($e->getMessage(),$e); } } } public function watcherCallFunc() { $args = func_get_args(); $this->logger->info("watcherCallFunc:".json_encode($args,true)); if(\Zookeeper::CONNECTED_STATE == $args[1]) { $this->logger->info("connect to zk:OK"); $this->registerServiceToZk(); } else if(\Zookeeper::SESSIONEXPIRED == $args[1]) { $this->logger->error("zk's session expired, reconnect to zk"); $this->registerZk(); } else { $this->logger->error("zk's connect expired, errorCode:%d", $args[1]); } } }