client/Apache.ShenYu.Client/Utils/Zookeeper/ZookeeperClient.cs (294 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using org.apache.zookeeper; using org.apache.zookeeper.data; #if !NET40 using TaskEx = System.Threading.Tasks.Task; #endif namespace Apache.ShenYu.Client.Utils { /// <summary> /// zookeeper client /// </summary> public class ZookeeperClient : Watcher, IZookeeperClient { private ZooKeeper _zookeeperClient; private ZkOptions _options; private ConnectionStateChangeHandler _connectionStateChangeHandler; private Event.KeeperState _currentState; private readonly AutoResetEvent _stateChangedCondition = new AutoResetEvent(false); private readonly object _zkEventLock = new object(); private bool _isDispose; /// <summary> /// create client /// </summary> /// <param name="connectionString"></param> public ZookeeperClient(string connectionString) : this(new ZkOptions(connectionString)) { } /// <summary> /// create client /// </summary> /// <param name="options"></param> public ZookeeperClient(ZkOptions options) { _options = options; _zookeeperClient = CreateZooKeeper(); } #region Public Method /// <summary> /// wait zk connect to give states /// </summary> /// <param name="states"></param> /// <param name="timeout"></param> /// <returns>success:true,fail:false</returns> public bool WaitForKeeperState(Event.KeeperState states, TimeSpan timeout) { var stillWaiting = true; while (_currentState != states) { if (!stillWaiting) { return false; } stillWaiting = _stateChangedCondition.WaitOne(timeout); } return true; } /// <summary> /// retry util zk connected /// </summary> /// <typeparam name="T"></typeparam> /// <param name="callable">execute zk action</param> /// <returns></returns> public async Task<T> RetryUntilConnected<T>(Func<Task<T>> callable) { var operationStartTime = DateTime.Now; while (true) { try { return await callable(); } catch (KeeperException.ConnectionLossException) { #if NET40 await TaskEx.Yield(); #else await Task.Yield(); #endif this.WaitForRetry(); } catch (KeeperException.SessionExpiredException) { #if NET40 await TaskEx.Yield(); #else await Task.Yield(); #endif this.WaitForRetry(); } if (DateTime.Now - operationStartTime > _options.OperatingSpanTimeout) { throw new TimeoutException( $"Operation cannot be retried because of retry timeout ({_options.OperatingSpanTimeout.TotalMilliseconds} milli seconds)"); } } } /// <summary> /// get give node data /// </summary> /// <param name="path"></param> /// <returns></returns> public async Task<IEnumerable<byte>> GetDataAsync(string path) { path = GetZooKeeperPath(path); return await RetryUntilConnected(async () => { var data = await _zookeeperClient.getDataAsync(path, false); return data?.Data; }); } /// <summary> /// node exists /// </summary> /// <param name="path"></param> /// <returns>if have return true,then return false。</returns> public async Task<bool> ExistsAsync(string path) { path = GetZooKeeperPath(path); return await RetryUntilConnected(async () => { var data = await _zookeeperClient.existsAsync(path, false); var exists = data != null; return exists; }); } /// <summary> /// create node /// </summary> /// <param name="path"></param> /// <param name="data"></param> /// <param name="acls"></param> /// <param name="createMode"></param> /// <returns></returns> /// <remarks> /// /// </remarks> public async Task<string> CreateAsync(string path, byte[] data, List<ACL> acls, CreateMode createMode) { path = GetZooKeeperPath(path); return await RetryUntilConnected(async () => { path = await _zookeeperClient.createAsync(path, data, acls, createMode); return path; }); } public async Task<string> CreateOrUpdateAsync(string path, byte[] data, List<ACL> acls, CreateMode createMode) { path = GetZooKeeperPath(path); return await RetryUntilConnected(async () => { var existsResult = await _zookeeperClient.existsAsync(path, false) != null; if (existsResult) { await _zookeeperClient.setDataAsync(path, data); } else { path = await _zookeeperClient.createAsync(path, data, acls, createMode); } return path; }); } /// <summary> /// /// </summary> /// <param name="path"></param> /// <param name="data"></param> /// <param name="acls"></param> /// <param name="createMode"></param> /// <returns></returns> public async Task<bool> CreateWithParentAsync(string path, byte[] data, List<ACL> acls, CreateMode createMode) { path = GetZooKeeperPath(path); return await RetryUntilConnected(async () => { var paths = path.Trim('/').Split('/'); var cur = ""; foreach (var item in paths) { if (string.IsNullOrEmpty(item)) { continue; } cur += $"/{item}"; var existStat = await _zookeeperClient.existsAsync(cur, null); if (existStat != null) { continue; } if (cur.Equals(path)) { await _zookeeperClient.createAsync(cur, data, acls, createMode); } else { await _zookeeperClient.createAsync(cur, null, acls, createMode); } } return await Task.FromResult(true); }); } /// <summary> /// set node data /// </summary> /// <param name="path"></param> /// <param name="data"></param> /// <param name="version"></param> /// <returns>node stat</returns> public async Task<Stat> SetDataAsync(string path, byte[] data, int version = -1) { path = GetZooKeeperPath(path); return await RetryUntilConnected(async () => { var stat = await _zookeeperClient.setDataAsync(path, data, version); return stat; }); } /// <summary> /// delete node /// </summary> /// <param name="path"></param> /// <param name="version"></param> public async Task DeleteAsync(string path, int version = -1) { path = GetZooKeeperPath(path); await RetryUntilConnected(async () => { await _zookeeperClient.deleteAsync(path, version); return 0; }); } /// <summary> /// subscribe connect stat change /// </summary> /// <param name="listener"></param> public void SubscribeStatusChange(ConnectionStateChangeHandler listener) { _connectionStateChangeHandler += listener; } /// <summary> /// unsubscribe connect stat change /// </summary> /// <param name="listener"></param> public void UnSubscribeStatusChange(ConnectionStateChangeHandler listener) { _connectionStateChangeHandler -= listener; } #endregion Public Method #region Overrides of Watcher /// <summary>Processes the specified event.</summary> /// <param name="watchedEvent">The event.</param> /// <returns></returns> public override async Task process(WatchedEvent watchedEvent) { if (_isDispose) return; await OnConnectionStateChange(watchedEvent); } #endregion Overrides of Watcher #region Implementation of IDisposable /// <summary>execute dispose or reset</summary> public void Dispose() { if (_isDispose) return; _isDispose = true; lock (_zkEventLock) { TaskEx.Run(async () => { await _zookeeperClient.closeAsync().ConfigureAwait(false); }).ConfigureAwait(false) .GetAwaiter().GetResult(); } } #endregion Implementation of IDisposable #region Private Method private async Task OnConnectionStateChange(WatchedEvent watchedEvent) { if (_isDispose) return; var state = watchedEvent.getState(); SetCurrentState(state); if (state == Event.KeeperState.Expired) { await ReConnect(); } _stateChangedCondition.Set(); if (_connectionStateChangeHandler == null) return; await _connectionStateChangeHandler(this, new ConnectionStateChangeArgs { State = state }); } private ZooKeeper CreateZooKeeper() { //log write to file switch ZooKeeper.LogToFile = _options.LogToFile; var zkClient = new ZooKeeper(_options.ConnectionString, (int)_options.SessionSpanTimeout.TotalMilliseconds, this, _options.ReadOnly); if (!string.IsNullOrEmpty(_options.Digest)) { zkClient.addAuthInfo("digest",Encoding.UTF8.GetBytes(_options.Digest)); } var operationStartTime = DateTime.Now; while (true) { if (zkClient.getState() == ZooKeeper.States.CONNECTING) { Thread.Sleep(100); }else if(zkClient.getState() == ZooKeeper.States.CONNECTED ||zkClient.getState() == ZooKeeper.States.CONNECTEDREADONLY) { return zkClient; } if (DateTime.Now - operationStartTime > _options.OperatingSpanTimeout) { throw new TimeoutException( $"connect cannot be retried because of retry timeout ({_options.OperatingSpanTimeout.TotalMilliseconds} milli seconds)"); } } } private async Task ReConnect() { if (!Monitor.TryEnter(_zkEventLock, _options.ConnectionSpanTimeout)) return; try { if (_zookeeperClient != null) { try { await _zookeeperClient.closeAsync(); } catch { // ignored } } _zookeeperClient = CreateZooKeeper(); } finally { Monitor.Exit(_zkEventLock); } } private void SetCurrentState(Event.KeeperState state) { lock (this) { _currentState = state; } } private string GetZooKeeperPath(string path) { var basePath = _options.BaseRoutePath ?? "/"; if (!basePath.StartsWith("/")) basePath = basePath.Insert(0, "/"); basePath = basePath.TrimEnd('/'); if (!path.StartsWith("/")) path = path.Insert(0, "/"); path = $"{basePath}{path.TrimEnd('/')}"; return string.IsNullOrEmpty(path) ? "/" : path; } /// <summary> /// wait util zk connect success,timeout is in options /// </summary> private void WaitForRetry() { WaitForKeeperState(Watcher.Event.KeeperState.SyncConnected, _options.OperatingSpanTimeout); } #endregion Private Method } }