registry/automatic/ZookeeperClient.php (196 lines of code) (raw):

<?php /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace com\fenqile\fsof\registry\automatic; class ZookeeperClient { /** * @var Zookeeper */ private $zookeeper = null; private $address; private $func = null; private $ephemeral = false; private $zkFile = null; private $waiteForConnectStateTimes = 20; private $logger; public function __construct() { $this->logger = \Logger::getLogger(__CLASS__); } public function __destruct() { $this->closeLog(); unset($this->zookeeper); $this->logger->info('zookeeperClient destruct'); } public function connect_cb($type, $event, $string) { // Watcher gets consumed so we need to set a new one $this->logger->debug("connect state:{$event}"); $params = array($type, $event, $string); if(isset($this->func)) { $this->logger->warn("call connect state"); call_user_func_array($this->func, $params); } $this->logger->debug("connect_cb end"); } public function connectZk($address, $ephemeral = false) { $ret = true; $this->address = $address; $this->ephemeral = $ephemeral; try { if ($ephemeral) { $this->zookeeper = new \Zookeeper($this->address, array($this,'connect_cb')); } else { $this->zookeeper = new \Zookeeper($this->address); if($this->zookeeper) { while(\Zookeeper::CONNECTED_STATE != $this->zookeeper->getState() &&($this->waiteForConnectStateTimes > 0)) { //等待连接状态 $this->waiteForConnectStateTimes--; $this->logger->debug("wait for connect state"); usleep(50000); } } } if(empty($this->zookeeper)) { $ret = false; } } catch (\Exception $e) { $ret = false; $this->logger->error($e->getMessage().'|address:'.$this->address); } return $ret; } public function registerCallFunc($func) { $this->func = $func; } /** * 创建节点。 * * @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers/URL.Encoder(provider url)。 * @param ephemeral,是否是临时节点,所有树干节点都为固定节点,只有叶子节点可以设为临时节点,没有设置,默认为固定节点 */ public function create($path) { $ret = $this->set($path); $this->logger->info('zookeeperClient create|path:'.$path.'|ret:'.$ret); return $ret; } /** * 删除节点,临时节点不需要删除,zookeeper在与对应的 client的连接断开后自动删除 * @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers/URL.Encoder(provider url)。 */ public function delete($path, $version=-1) { if(isset($this->zookeeper)) { if($this->zookeeper->exists($path)) { $this->zookeeper->delete($path, $version); } } return true; } private function set($path, $value=null) { if(isset($this->zookeeper)) { if($this->zookeeper->exists($path)) { $this->zookeeper->set($path, $value); } else { $this->makePath($path); $this->makeNode($path, $value, $this->ephemeral); } } return true; } /** * 获取指定节点的所有childe节点 * * @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers。 */ public function getChildren($path) { if((strlen($path) > 1) && preg_match('@/$@', $path)) { // remove trailing / $path = substr($path, 0, -1); } return $this->zookeeper->getChildren($path); } /** * 为指定节点设置一个 childListener,用于监听其child变动情况 * * @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers。 * @param ChildListener */ public function addChildListener($path, $childListener) { } /** * 删除指定节点上的 childListener * * @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers。 * @param ChildListener */ public function removeChildListener($path, $childeListener) { } /** * 设置一个client的state状态监听器,如在连接建立或重建时,主动向zookeeper server拉一次数据 * * @param StateListener */ public function addStateListener($stateListener) { } /** * 删除一个client的state状态监听器 * * @param StateListener */ public function removeStateListener($stateListener) { } public function isConnected() { $ret = $this->zookeeper->getState(); if(\Zookeeper::CONNECTED_STATE == $ret) { return true; } else { return false; } } public function close() { } public function getUrl() { } /** * Equivalent of "mkdir -p" on ZooKeeper * * @param string $path The path to the node * @param string $value The value to assign to each new node along the path * * @return bool */ private function makePath($path, $value = '') { $parts = explode('/', $path); $parts = array_filter($parts); $subpath = ''; while (count($parts) > 1) { $subpath .= '/' . array_shift($parts); if (!$this->zookeeper->exists($subpath)) { $this->makeNode($subpath, $value); } } } /** * Create a node on ZooKeeper at the given path * * @param string $path The path to the node * @param string $value The value to assign to the new node * @param bool $endNode The value to assign to the end node * @param array $params Optional parameters for the Zookeeper node. * By default, a public node is created * * @return string the path to the newly created node or null on failure */ private function makeNode($path, $value, $ephemeral = false, array $params = array()) { //$this->logger->debug("makeNode(".$path.",".$value.",".($ephemeral?1:0).",".json_encode($params).")"); if(empty($params)) { $params = array( array( 'perms' => \Zookeeper::PERM_ALL, 'scheme' => 'world', 'id' => 'anyone', ) ); } if ($ephemeral) { return $this->zookeeper->create($path, $value, $params , \Zookeeper::EPHEMERAL); } else { return $this->zookeeper->create($path, $value, $params); } } public function setLogFile($file, $logLevel = 2) { $this->zkFile = fopen($file,"a+"); if($this->zkFile && $this->zookeeper) { $this->zookeeper->setDebugLevel($logLevel); $this->zookeeper->setLogStream($this->zkFile); } } private function closeLog() { if(!empty($this->zkFile)) { fclose($this->zkFile); } } }