consumer/proxy/ProxyFactory.php (153 lines of code) (raw):
<?php
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
namespace com\fenqile\fsof\consumer\proxy;
use com\fenqile\fsof\common\url\FSOFUrl;
use com\fenqile\fsof\common\log\FSOFSystemUtil;
use com\fenqile\fsof\common\config\FSOFConstants;
use com\fenqile\fsof\common\config\FSOFCommonUtil;
use com\fenqile\fsof\registry\automatic\ConsumerProxy;
use com\fenqile\fsof\consumer\ConsumerException;
final class ProxyFactory
{
	/**
	 * @var  App加载的配置文件路径
	 */
	protected static $appConfigFile = '';
	/**
	 * @var  p2p模式开关
	 */
	protected static $p2pMode = FALSE;
    /**
     * @var app所属的group
     */
    protected static $appGroup = FSOFConstants::FSOF_SERVICE_GROUP_ANY;
    /**
     * @var app所属的version
     */
    protected static $appVersion = FSOFConstants::FSOF_SERVICE_VERSION_DEFAULT;
        
    /**
     * @var  appname
     */
    protected static $appName = 'consumer';
	/**
	 * @var  instance array
	 */
    protected static $serviceInstances = array();
    /**
	 * @var  *.consumer中配置的service信息
	 */
	protected static $serviceConsumers = array();
    /**
     * @var  *.consumer中配置的config信息
     */
	protected static $configConsumer = Array();
    private static $logger;
	public static function setConsumerConfig($configData, $consumerConfigFile, $initSettings)
	{
        self::$logger = \Logger::getLogger(__CLASS__);
		//App名字
		self::$appName = $initSettings['app_name'];
		//获取app加载配置文件;输出日志方便问题定位
		self::$appConfigFile = $consumerConfigFile;
		//是否开启p2p模式, p2p模式下, 将使用[consumer_services]下的信息进行路由
		if(isset($configData['consumer_config']['p2p_mode']) && $configData['consumer_config']['p2p_mode'])
		{
			self::$p2pMode = $configData['consumer_config']['p2p_mode'];
		}
		if (isset($configData['consumer_config']['group']))
		{
			self::$appGroup = $configData['consumer_config']['group'];
		}
		if(isset($configData['consumer_config']['version']))
		{
			self::$appVersion = $configData['consumer_config']['version'];
		}
        if(isset($configData['consumer_config']))
        {
            self::$configConsumer = $configData['consumer_config'];
        }
		if(isset($configData['consumer_services']))
		{
			self::$serviceConsumers = $configData['consumer_services'];
		}
	}
    //use automatic registry
    private static function getInstancByRedis($service, $ioTimeOut, $version, $group)
	{
        $ret = NULL;
        $providerInfo = ConsumerProxy::instance(self::$configConsumer)->getProviders($service, $version, $group);
        if(!empty($providerInfo))
        {
            $cacheKey = $service.':'.$version.':'.$group;
            if(empty(self::$serviceInstances[$cacheKey]))
            {
                $ret = Proxy::newProxyInstance($service,  self::$appName, $group);
                self::$serviceInstances[$cacheKey] = $ret;
            }
            else
            {
                $ret = self::$serviceInstances[$cacheKey];
            }
            //设置io超时时间
            $ret->setIOTimeOut($ioTimeOut);
			
            //设置地址列表
            $ret->setAddress($providerInfo);
        }
        else
        {
            self::$logger->error("not find providers form redis for $service:$version:$group");
        }
        
        return $ret;     
    }
    //use p2p mode
    private static function getInstanceByP2P($service, $ioTimeOut, $version, $group)
    {
        $ret = NULL;
        if(array_key_exists($service, self::$serviceConsumers))
        {
            $serviceProperty = self::$serviceConsumers[$service];
			if (isset($serviceProperty['url']))
			{
				$ret = Proxy::newProxyInstance($service, self::$appName, $group);
				//设置io超时时间
				$ret->setIOTimeOut($ioTimeOut);
				//设置所用服务的ip地址列表
				$serviceAddr = explode(",",$serviceProperty['url']);
				$serviceUrls = array();
				foreach ($serviceAddr as $index => $addr)
				{
					$tmpUrl = $addr.'/'."{$service}?version={$version}&group={$group}";
					$serviceUrls[] = new FSOFUrl($tmpUrl);
				}
				$ret->setAddress($serviceUrls);
			}
			else
			{
                self::$logger->warn(self::$appName.'.consumer not exist url');
			}
        }
        else 
        {
            self::$logger->warn('service not found on p2p|consumer_app:'.self::$appName.'|provider_service:'.$service);
        }
        
        return $ret;         
    }
    public static function getInstance($consumerInterface, $ioTimeOut = 3, $version = null, $group = null)
    {
    	$ret = NULL;
        $route = '';
        $addressList = 'null';
		//app级组和版本信息
		$group = self::$appGroup;
		$versionList = self::$appVersion;
		if (array_key_exists($consumerInterface, self::$serviceConsumers))
		{
            $serviceProperty = self::$serviceConsumers[$consumerInterface];
			if(isset($serviceProperty['group']))
			{
				$group = $serviceProperty['group'];
			}
			if(isset($serviceProperty['version']))
			{
				$versionList = $serviceProperty['version'];
			}
		}
		try
		{
			//依据配置权重选取版本号
			$version = FSOFCommonUtil::getVersionByWeight($versionList);
			//p2p模式
			if (self::$p2pMode)
			{
				$ret = self::getInstanceByP2P($consumerInterface, $ioTimeOut, $version, $group);
				$route = 'p2p';
			}
			if (empty($ret))
			{
				//registry 模式
				$ret = self::getInstancByRedis($consumerInterface, $ioTimeOut, $version, $group);
				$route = 'auto registry';
			}
			if (empty($ret))
			{
				$errMsg = "current_address:".FSOFSystemUtil::getLocalIP()."|".$consumerInterface;
                throw new ConsumerException($errMsg);
			}
			else
			{
                $addressList = $ret->getAddressStr();
			}
            self::$logger->debug('consumer_app:'.self::$appName.'|app_config_file:'.self::$appConfigFile.
                '|version:'.$version.'|group:'.$group.'|provider_service:'.$consumerInterface.'|route:'.$route.'|addr_list:'.$addressList.'|timeout:'.$ioTimeOut);
        }
        catch (\Exception $e)
        {
            self::$logger->error('consumer_app:'.self::$appName.'|app_config_file:'.self::$appConfigFile.
                '|version:'.$version.'|group:'.$group.'|provider_service:'.$consumerInterface.'|errmsg:'. $e->getMessage().'|exceptionmsg:'.$e);
            throw new ConsumerException($e->getMessage(), $e);
        }
    	return $ret;
    }
}