client/Apache.ShenYu.Client/Registers/ShenyuZookeeperRegister.cs (130 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Apache.ShenYu.Client.Models.DTO;
using Apache.ShenYu.Client.Options;
using Apache.ShenYu.Client.Utils;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using org.apache.zookeeper;
namespace Apache.ShenYu.Client.Registers
{
public class ShenyuZookeeperRegister : ShenyuAbstractRegister
{
private readonly ILogger<ShenyuZookeeperRegister> _logger;
private ShenyuOptions _shenyuOptions;
private ZookeeperClient _zkClient;
private Dictionary<string, string> _nodeDataMap = new Dictionary<string, string>();
public ShenyuZookeeperRegister(ILogger<ShenyuZookeeperRegister> logger)
{
_logger = logger;
}
public override Task Init(ShenyuOptions shenyuOptions)
{
if (string.IsNullOrEmpty(shenyuOptions.Register.ServerList))
{
throw new System.ArgumentException("serverList can not be null.");
}
var serverList = shenyuOptions.Register.ServerList;
this._shenyuOptions = shenyuOptions;
//props
var props = shenyuOptions.Register.Props;
int sessionTimeout = Convert.ToInt32(props.GetValueOrDefault(Constants.RegisterConstants.SessionTimeout, "3000"));
int connectionTimeout = Convert.ToInt32(props.GetValueOrDefault(Constants.RegisterConstants.ConnectionTimeout, "3000"));
int operatingTimeout = Convert.ToInt32(props.GetValueOrDefault(Constants.RegisterConstants.OperatingTimeout, "1000"));
ZkOptions zkConfig = new ZkOptions(serverList);
zkConfig.SetOperatingTimeout(operatingTimeout)
.SetSessionTimeout(sessionTimeout)
.SetConnectionTimeout(connectionTimeout);
props.TryGetValue(Constants.RegisterConstants.Digest, out string digest);
if (!string.IsNullOrEmpty(digest))
{
zkConfig.SetDigest(digest);
}
this._zkClient = new ZookeeperClient(zkConfig);
this._zkClient.SubscribeStatusChange(async (client, connectionStateChangeArgs) =>
{
switch (connectionStateChangeArgs.State)
{
case Watcher.Event.KeeperState.Disconnected:
case Watcher.Event.KeeperState.Expired:
if (client.WaitForKeeperState(Watcher.Event.KeeperState.SyncConnected,
zkConfig.ConnectionSpanTimeout))
{
foreach (var node in _nodeDataMap)
{
var existStat = await _zkClient.ExistsAsync(node.Key);
if (!existStat)
{
var pathArr = node.Key.Trim('/').Split('/');
//create parent
if (pathArr.Length > 1) {
var parentPath = node.Key.TrimEnd('/').Substring(0, node.Key.TrimEnd('/').LastIndexOf("/"));
await _zkClient.CreateWithParentAsync(parentPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
await _zkClient.CreateOrUpdateAsync(node.Key,
Encoding.UTF8.GetBytes(node.Value),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
_logger.LogInformation("zookeeper client register success: {}", node.Value);
}
}
}
else
{
_logger.LogError("zookeeper server disconnected and retry connect fail");
}
break;
case Watcher.Event.KeeperState.AuthFailed:
_logger.LogError("zookeeper server AuthFailed");
break;
case Watcher.Event.KeeperState.SyncConnected:
case Watcher.Event.KeeperState.ConnectedReadOnly:
break;
}
await Task.CompletedTask;
});
return Task.CompletedTask;
}
public override async Task PersistInterface(MetaDataRegisterDTO metadata)
{
string contextPath = ContextPathUtils.BuildRealNode(metadata.contextPath, metadata.appName);
await RegisterMetadataAsync(contextPath, metadata);
}
public override async Task PersistURI(URIRegisterDTO registerDTO)
{
string contextPath = ContextPathUtils.BuildRealNode(registerDTO.contextPath, registerDTO.appName);
await RegisterURIAsync(contextPath, registerDTO);
}
private async Task RegisterURIAsync(string contextPath, URIRegisterDTO registerDTO)
{
string uriNodeName = BuildURINodeName(registerDTO);
string uriPath = RegisterPathConstants.BuildURIParentPath(registerDTO.rpcType, contextPath);
string realNode = RegisterPathConstants.BuildRealNode(uriPath, uriNodeName);
await _zkClient.CreateWithParentAsync(uriPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
string nodeData = JsonConvert.SerializeObject(registerDTO, Formatting.None, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
});
_nodeDataMap[realNode] = nodeData;
await _zkClient.CreateOrUpdateAsync(realNode, Encoding.UTF8.GetBytes(nodeData), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
private async Task RegisterMetadataAsync(string contextPath, MetaDataRegisterDTO metadata)
{
string metadataNodeName = BuildMetadataNodeName(metadata);
string metaDataPath = RegisterPathConstants.BuildMetaDataParentPath(metadata.rpcType, contextPath);
string realNode = RegisterPathConstants.BuildRealNode(metaDataPath, metadataNodeName);
//create parent node
await _zkClient.CreateWithParentAsync(metaDataPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
var metadataStr = JsonConvert.SerializeObject(metadata, Formatting.None, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
});
await _zkClient.CreateOrUpdateAsync(realNode, Encoding.UTF8.GetBytes(metadataStr), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
_logger.LogInformation("{} zookeeper client register metadata success: {}", metadata.rpcType, metadata);
}
public override async Task Close()
{
this._zkClient.Dispose();
await Task.CompletedTask;
}
}
}