csharp/rocketmq-client-csharp/ClientManager.cs (158 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Proto = Apache.Rocketmq.V2;
using System;
using System.Threading;
using System.Threading.Tasks;
using grpcLib = Grpc.Core;
using System.Collections.Generic;
using System.Linq;
namespace Org.Apache.Rocketmq
{
public class ClientManager : IClientManager
{
private readonly Client _client;
private readonly Dictionary<Endpoints, IRpcClient> _rpcClients;
private readonly ReaderWriterLockSlim _clientLock;
public ClientManager(Client client)
{
_client = client;
_rpcClients = new Dictionary<Endpoints, IRpcClient>();
_clientLock = new ReaderWriterLockSlim();
}
private IRpcClient GetRpcClient(Endpoints endpoints)
{
_clientLock.EnterReadLock();
try
{
// client exists, return in advance.
if (_rpcClients.TryGetValue(endpoints, out var cachedClient))
{
return cachedClient;
}
}
finally
{
_clientLock.ExitReadLock();
}
_clientLock.EnterWriteLock();
try
{
// client exists, return in advance.
if (_rpcClients.TryGetValue(endpoints, out var cachedClient))
{
return cachedClient;
}
// client does not exist, generate a new one
var client = new RpcClient(endpoints, _client.GetClientConfig().SslEnabled);
_rpcClients.Add(endpoints, client);
return client;
}
finally
{
_clientLock.ExitWriteLock();
}
}
public async Task Shutdown()
{
_clientLock.EnterReadLock();
try
{
var tasks = _rpcClients.Select(item => item.Value.Shutdown()).ToList();
await Task.WhenAll(tasks);
}
finally
{
_clientLock.ExitReadLock();
}
}
public grpcLib::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry(
Endpoints endpoints)
{
return GetRpcClient(endpoints).Telemetry(_client.Sign());
}
public async Task<RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>> QueryRoute(
Endpoints endpoints, Proto.QueryRouteRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).QueryRoute(metadata, request, timeout);
return new RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>(request, response, metadata);
}
public async Task<RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>> Heartbeat(Endpoints endpoints,
Proto.HeartbeatRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).Heartbeat(metadata, request, timeout);
return new RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>(request, response, metadata);
}
public async Task<RpcInvocation<Proto.NotifyClientTerminationRequest, Proto.NotifyClientTerminationResponse>>
NotifyClientTermination(Endpoints endpoints, Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).NotifyClientTermination(metadata, request, timeout);
return new RpcInvocation<Proto.NotifyClientTerminationRequest, Proto.NotifyClientTerminationResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>>
RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout);
return new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>> SendMessage(
Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).SendMessage(metadata, request, timeout);
return new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>> QueryAssignment(
Endpoints endpoints, Proto.QueryAssignmentRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).QueryAssignment(metadata, request, timeout);
return new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>>
ReceiveMessage(Endpoints endpoints, Proto.ReceiveMessageRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).ReceiveMessage(metadata, request, timeout);
return new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>> AckMessage(
Endpoints endpoints, Proto.AckMessageRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).AckMessage(metadata, request, timeout);
return new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>>
ChangeInvisibleDuration(Endpoints endpoints,
Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).ChangeInvisibleDuration(metadata, request, timeout);
return new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.ForwardMessageToDeadLetterQueueRequest, Proto.ForwardMessageToDeadLetterQueueResponse>>
ForwardMessageToDeadLetterQueue(Endpoints endpoints,
Proto.ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).ForwardMessageToDeadLetterQueue(metadata, request, timeout);
return new RpcInvocation<Proto.ForwardMessageToDeadLetterQueueRequest, Proto.ForwardMessageToDeadLetterQueueResponse>(
request, response, metadata);
}
public async Task<RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>> EndTransaction(
Endpoints endpoints, Proto.EndTransactionRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).EndTransaction(metadata, request, timeout);
return new RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>(
request, response, metadata);
}
}
}