nlsCppSdk/transport/connectedPool.h (185 lines of code) (raw):
/*
* Copyright 2025 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef NLS_SDK_CONNECTED_POOL_H
#define NLS_SDK_CONNECTED_POOL_H
#include <list>
#include <vector>
#if defined(_MSC_VER)
#include <windows.h>
#else
#include <pthread.h>
#endif
#include "SSLconnect.h"
#include "connectNode.h"
#include "event2/util.h"
#include "flowingSynthesizerParam.h"
#include "iNlsRequest.h"
#include "nlog.h"
#include "speechRecognizerParam.h"
#include "speechSynthesizerParam.h"
#include "speechTranscriberParam.h"
namespace AlibabaNls {
#ifndef INVALID_SOCKET
#define INVALID_SOCKET -1
#endif
enum ConnectedStatus {
PreNodeInvalid = 0,
PreNodeToBeCreated,
PreNodeCreated,
PreNodeConnected,
PreNodeStarted,
};
struct ConnectedNodeProcess {
public:
explicit ConnectedNodeProcess()
: status(PreNodeInvalid),
startTimestamp(0),
workableTimestamp(0),
tokenExpirationTimestamp(0),
socketFd(INVALID_SOCKET),
sslHandle(NULL),
canPick(false),
ttsVersion(ShortTts),
request(NULL),
isAbnormal(false),
shouldRelease(false),
shouldPreconnect(false),
curRequest(NULL),
curRequestInvalid(false),
sdkName(""),
startedResponse(""){};
~ConnectedNodeProcess() {
if (request) {
delete request;
request = NULL;
}
curRequest = NULL;
curRequestInvalid = false;
isAbnormal = false;
shouldRelease = false;
shouldPreconnect = false;
canPick = false;
};
NlsType type;
ConnectedStatus status;
uint64_t startTimestamp;
uint64_t workableTimestamp;
uint64_t tokenExpirationTimestamp;
/* socketFd & sslHandle 是判断节点的条件 */
evutil_socket_t socketFd;
SSLconnect *sslHandle;
bool canPick; /* ConnectedNode中SSL可被取走 */
int ttsVersion;
/* socketFd & sslHandle 所在的最早的request, 它将在ConnectedPool中释放,
* 而不是在交互过程中释放 */
INlsRequest *request;
bool isAbnormal;
/* 需要从Pool中删除, 并反初始化此request */
bool shouldRelease;
/* 需要从PrestartedNodePool中删除, 重新建连加入PreconectedNodePool中 */
bool shouldPreconnect;
/* socketFd & sslHandle 当前所属的request */
INlsRequest *curRequest;
/* push完后还未finish, 标记此curRequest处于无效状态 */
bool curRequestInvalid;
std::string sdkName;
std::string startedResponse;
void clearNode() {
status = PreNodeToBeCreated;
startTimestamp = 0;
workableTimestamp = 0;
tokenExpirationTimestamp = 0;
socketFd = INVALID_SOCKET;
sslHandle = NULL;
canPick = false;
ttsVersion = 0;
request = NULL;
isAbnormal = false;
shouldRelease = false;
shouldPreconnect = false;
curRequest = NULL;
curRequestInvalid = false;
sdkName.clear();
startedResponse.clear();
}
};
struct ConnectedPoolProcess {
public:
explicit ConnectedPoolProcess() : type(TypeRealTime), work(false){};
~ConnectedPoolProcess() { work = false; };
NlsType type;
/* 此ConnectedPoolProcess开始工作的标记 */
bool work;
std::list<int> prestartedIndexList;
std::list<int> preconnectedIndexList;
std::vector<struct ConnectedNodeProcess> prestartedRequests;
std::vector<struct ConnectedNodeProcess> preconnectedRequests;
};
class ConnectedPool {
public:
/**
* @brief 预连接池将会每1秒检查每个节点
* @param maxNumber 预连接池中每类交互的最大预连接的数量和最大正在交互的数量
* @param timeoutMs 连接到交互服务器但是未启动交互的预连接超时时间
* @param requestedTimeoutMs 连接到交互服务器且启动交互的预连接超时时间
*/
ConnectedPool(unsigned int maxNumber, unsigned int timeoutMs,
unsigned int requestedTimeoutMs);
~ConnectedPool();
#ifdef _MSC_VER
static unsigned __stdcall loopConnectedPoolEventCallback(LPVOID arg);
#else
static void *loopConnectedPoolEventCallback(void *arg);
#endif
static void connectPoolEventCallback(evutil_socket_t socketFd, short event,
void *arg);
static void nodeReleaseEventCallback(evutil_socket_t socketFd, short event,
void *arg);
/**
* @brief 启动预连接池的工作线程
* @return 成功获取则Success
*/
int startConnectedPool();
/**
* @brief 结束预连接池的工作线程
* @return 成功获取则Success
*/
int stopConnectedPool();
/**
* @brief 取走一个预连接, 并设置到INlsRequest中. 取走started
* @return 成功获取则true, 没有可用预连接则false
*/
bool popPrestartedNode(INlsRequest *request, NlsType type);
/**
* @brief 取走一个预连接, 并设置到INlsRequest中. 取走connected
* @return 成功获取则true, 没有可用预连接则false
*/
bool popPreconnectedNode(INlsRequest *request, NlsType type);
/**
* @brief 将连接的网络连接相关的节点进行存储, 存储的SSL暂时不可用
* @return 成功存储节点则true, 否则false
*/
bool pushPreconnectedNode(INlsRequest *request, NlsType type,
bool newNode = false);
/**
* @brief 将完成建连的交互连接相关的节点进行存储, 存储的SSL暂时不可用
* @return 成功存储节点则true, 否则false
*/
bool pushPrestartedNode(INlsRequest *request, NlsType type,
bool newNode = false);
/**
* @brief PreconnectedNode从preconnected池子移动到prestarted池子
* @return 成功存储节点则true, 否则false
*/
bool pushPrestartedNodeFromPreconnected(INlsRequest *request, NlsType type);
/**
* @brief 此request使用的SSL是否是ConnectedPool中的有效SSL
*/
bool sslBelongToPool(INlsRequest *request, NlsType type,
bool &oriRequestIsAbnormal, bool &requestInPool);
/**
* @brief 此request使用的SSL在ConnectedPool中标记为异常, 后续需要删除
*/
void curRequestIsAbnormal(INlsRequest *request, NlsType type);
/**
* @brief 使存储的SSL变可用可取走
*/
void finishPushPreNode(NlsType type, evutil_socket_t curSocketFd,
SSLconnect *curSslHandle, int index,
INlsRequest *request);
/**
* @brief 此request是否存储在ConnectedPool中
*/
bool requestInPool(INlsRequest *request, NlsType type);
/**
* @brief 已经释放的request需要从Pool中删除
*/
bool deletePreNodeByRequest(INlsRequest *request, NlsType type);
/**
* @brief 从Pool中找到SSL所属request并删除
*/
bool deletePreNodeBySSL(SSLconnect *curSslHandle, NlsType type);
private:
int getNumberOfThisTypeNodes(NlsType type, int &prestarted,
int &preconnected);
int getNumberOfPreconnectedNodes(NlsType type);
int getNumberOfPrestartedNodes(NlsType type);
int initThisNodesPool(NlsType type);
bool popOnePreconnectedNode(INlsRequest *request, NlsType type);
bool popOnePrestartedNode(INlsRequest *request, NlsType type);
void deletePreNode(std::vector<struct ConnectedNodeProcess> *pool);
int timeoutPrestartedNode(std::vector<struct ConnectedNodeProcess> *pool);
int timeoutPreconnectedNode(std::vector<struct ConnectedNodeProcess> *pool);
void deleteOrPreconnectNodeShouldReleased(
std::vector<struct ConnectedNodeProcess> *pool, std::string name);
void preconnectNodeByRequest(INlsRequest *request);
void showEveryNode(std::vector<struct ConnectedNodeProcess> *pool,
std::string name);
std::string getStatusStr(ConnectedStatus status);
void insertListInOrder(std::list<int> &lst, int a);
void removeElement(std::list<int> &lst, int a);
int popListFront(std::list<int> &lst);
unsigned int _maxPreconnectedNumber;
unsigned int _preconnectedTimeoutMs;
unsigned int _prerequestedTimeoutMs;
#ifdef _MSC_VER
unsigned _poolWorkThreadId;
HANDLE _poolWorkThreadHandle;
#else
pthread_t _poolWorkThreadId;
#endif
struct event_base *_poolWorkBase;
struct event *_connectPoolEvent;
struct timeval _connectPoolTimerTv; // 默认每1秒检查下所有节点
bool _connectPoolTimerFlag;
struct event *_nodeReleaseEvent;
struct ConnectedPoolProcess _fssRequests;
struct ConnectedPoolProcess _srRequests;
struct ConnectedPoolProcess _stRequests;
struct ConnectedPoolProcess _syRequests;
#if defined(_MSC_VER)
HANDLE _lock;
#else
pthread_mutex_t _lock;
#endif
};
} // namespace AlibabaNls
#endif // NLS_SDK_CONNECTED_POOL_H