client/Apache.ShenYu.Client/Utils/Etcd/EtcdClientUtils.cs (145 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using dotnet_etcd;
using Etcdserverpb;
using Google.Protobuf;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Apache.ShenYu.Client.Utils
{
/// <summary>
/// etcd client.
/// </summary>
public class EtcdClientUtils
{
private readonly EtcdClient _client;
private readonly string _authToken;
private readonly bool _authNeed;
private readonly Grpc.Core.Metadata _metadata;
private long _globalLeaseId;
private ILogger _logger = NullLogger<EtcdClient>.Instance;
public EtcdClientUtils(EtcdOptions options)
{
//cluster:like "https://localhost:23790,https://localhost:23791,https://localhost:23792"
this._client = new EtcdClient(options.Address);
//auth
if (!string.IsNullOrEmpty(options.UserName) && !string.IsNullOrEmpty(options.Password))
{
var authRes = this._client.Authenticate(new Etcdserverpb.AuthenticateRequest()
{
Name = options.UserName,
Password = options.Password,
});
_authToken = authRes.Token;
_authNeed = true;
_metadata = _authNeed ? new Grpc.Core.Metadata() { new Grpc.Core.Metadata.Entry("token", _authToken) } : null;
}
InitLease(options);
}
/// <summary>
/// rent
/// </summary>
private void InitLease(EtcdOptions options)
{
try
{
// create rent id to bind
var response = this._client.LeaseGrant(new Etcdserverpb.LeaseGrantRequest()
{
TTL = options.TTL
});
this._globalLeaseId = response.ID;
var tokenSource = new CancellationTokenSource();
this._client.LeaseKeepAlive(this._globalLeaseId, tokenSource.Token);
//this._client.LeaseKeepAlive(new LeaseKeepAliveRequest()
//{
// ID = _globalLeaseId
//}, (x) =>
//{
// Console.WriteLine(x.ID);
//}, tokenSource.Token, _metadata);
}
catch (Exception ex)
{
_logger.LogError("Init Lease error", ex);
}
}
/// <summary>
/// dispose client
/// </summary>
public void Close()
{
this._client.Dispose();
}
/// <summary>
/// get data
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public RangeResponse Get(RangeRequest request)
{
var response = _client.Get(request, _metadata);
return response;
}
/// <summary>
/// get data
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public string GetVal(string key)
{
return _client.GetVal(key, _metadata);
}
/// <summary>
/// get data async
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<string> GetValAsync(string key)
{
return await _client.GetValAsync(key, _metadata);
}
/// <summary>
/// get rangevalues
/// </summary>
/// <param name="prefixKey"></param>
/// <returns></returns>
public IDictionary<string, string> GetRangeVals(string prefixKey)
{
return _client.GetRangeVal(prefixKey, _metadata);
}
/// <summary>
/// get rangevalues async
/// </summary>
/// <param name="prefixKey"></param>
/// <returns></returns>
public async Task<IDictionary<string, string>> GetRangeValsAsync(string prefixKey)
{
return await _client.GetRangeValAsync(prefixKey, _metadata);
}
/// <summary>
/// delete key
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public long Delete(string key)
{
var response = _client.Delete(key, _metadata);
return response.Deleted;
}
/// <summary>
/// delete key async
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<long> DeleteAsync(string key)
{
var response = await _client.DeleteAsync(key, _metadata);
return response.Deleted;
}
/// <summary>
/// delete rangeKeys
/// </summary>
/// <param name="perfixKey"></param>
/// <returns></returns>
public long DeleteRange(string perfixKey)
{
var response = _client.DeleteRange(perfixKey, _metadata);
return response.Deleted;
}
/// <summary>
/// delete rangeKeys async
/// </summary>
/// <param name="perfixKey"></param>
/// <returns></returns>
public async Task<long> DeleteRangeAsync(string perfixKey)
{
var response = await _client.DeleteRangeAsync(perfixKey, _metadata);
return response.Deleted;
}
/// <summary>
/// put key-val
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public Etcdserverpb.PutResponse Put(string key, string value)
{
return _client.Put(key, value, _metadata);
}
/// <summary>
/// putasync key-val
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task<Etcdserverpb.PutResponse> PutAsync(string key, string value)
{
return await _client.PutAsync(key, value, _metadata);
}
/// <summary>
/// put ke-val with leaseId
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public PutResponse PutEphemeral(string key, string value)
{
try
{
PutRequest request = new PutRequest()
{
Key = ByteString.CopyFromUtf8(key),
Value = ByteString.CopyFromUtf8(value),
Lease = this._globalLeaseId
};
var response = _client.Put(request, _metadata);
return response;
}
catch (Exception ex)
{
_logger.LogError("putEphemeral(key:{},value:{}) error.", key, value, ex);
}
return null;
}
/// <summary>
/// putasync ke-val with leaseId
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task<PutResponse> PutEphemeralAsync(string key, string value)
{
try
{
PutRequest request = new PutRequest()
{
Key = ByteString.CopyFromUtf8(key),
Value = ByteString.CopyFromUtf8(value),
Lease = this._globalLeaseId
};
var response = await _client.PutAsync(request, _metadata);
return response;
}
catch (Exception ex)
{
_logger.LogError("putEphemeral(key:{},value:{}) error.", key, value, ex);
}
return null;
}
}
}