nlsCppSdk/transport/connectedPool.cpp (2,307 lines of code) (raw):
/*
* Copyright 2025 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef _MSC_VER
#include <process.h>
#include <windows.h>
#else
#include <pthread.h>
#include <sched.h>
#include <signal.h>
#include <sys/prctl.h>
#include <sys/syscall.h>
#include <sys/sysinfo.h>
#include <unistd.h>
#endif
#include "connectedPool.h"
#include "flowingSynthesizerRequest.h"
#include "nlog.h"
#include "nlsEventNetWork.h"
#include "nlsRequestParamInfo.h"
#include "speechRecognizerRequest.h"
#include "speechSynthesizerRequest.h"
#include "speechTranscriberRequest.h"
#include "text_utils.h"
#include "utility.h"
namespace AlibabaNls {
ConnectedPool::ConnectedPool(unsigned int maxNumber, unsigned int timeoutMs,
unsigned int requestedTimeoutMs)
: _maxPreconnectedNumber(maxNumber),
_preconnectedTimeoutMs(timeoutMs),
_prerequestedTimeoutMs(requestedTimeoutMs),
_poolWorkBase(NULL),
_connectPoolEvent(NULL),
_connectPoolTimerFlag(false),
_nodeReleaseEvent(NULL) {
_poolWorkBase = event_base_new();
if (NULL == _poolWorkBase) {
LOG_ERROR("ConnectedPool(%p) invoke event_base_new failed.", this);
exit(1);
}
int features = event_base_get_features(_poolWorkBase);
LOG_INFO("ConnectedPool(%p) create evbase(%p), get features %d", this,
_poolWorkBase, features);
_fssRequests.type = TypeStreamInputTts;
_srRequests.type = TypeAsr;
_stRequests.type = TypeRealTime;
_syRequests.type = TypeTts;
if (NULL == _connectPoolEvent) {
_connectPoolEvent = evtimer_new(
_poolWorkBase, ConnectedPool::connectPoolEventCallback, this);
if (NULL == _connectPoolEvent) {
LOG_ERROR("ConnectedPool(%p) new event(connectPoolEventCallback) failed.",
this);
exit(1);
}
}
if (_nodeReleaseEvent == NULL) {
_nodeReleaseEvent =
event_new(_poolWorkBase, -1, EV_READ,
ConnectedPool::nodeReleaseEventCallback, this);
if (NULL == _nodeReleaseEvent) {
LOG_ERROR("ConnectedPool(%p) new event(nodeReleaseEventCallback) failed.",
this);
}
}
_connectPoolTimerTv.tv_sec = 1;
_connectPoolTimerTv.tv_usec = 500000;
if (_connectPoolEvent) {
LOG_DEBUG("ConnectedPool(%p) evtimer_add with %d.%ds.", this,
_connectPoolTimerTv.tv_sec, _connectPoolTimerTv.tv_usec);
evtimer_add(_connectPoolEvent, &_connectPoolTimerTv);
_connectPoolTimerFlag = true;
}
#if defined(_MSC_VER)
_lock = CreateMutex(NULL, FALSE, NULL);
_poolWorkThreadHandle =
(HANDLE)_beginthreadex(NULL, 0, loopConnectedPoolEventCallback,
(LPVOID)this, 0, &_poolWorkThreadId);
CloseHandle(_poolWorkThreadHandle);
#else
pthread_mutex_init(&_lock, NULL);
pthread_create(&_poolWorkThreadId, NULL, loopConnectedPoolEventCallback,
(void *)this);
#endif
}
ConnectedPool::~ConnectedPool() {
LOG_DEBUG("ConnectedPool(%p) destructing ...", this);
int tryCount = 50;
while (_poolWorkThreadId != 0 && tryCount-- > 0) {
usleep(1 * 1000);
}
if (_fssRequests.work) {
deletePreNode(&_fssRequests.prestartedRequests);
deletePreNode(&_fssRequests.preconnectedRequests);
}
if (_srRequests.work) {
deletePreNode(&_srRequests.prestartedRequests);
deletePreNode(&_srRequests.preconnectedRequests);
}
if (_stRequests.work) {
deletePreNode(&_stRequests.prestartedRequests);
deletePreNode(&_stRequests.preconnectedRequests);
}
if (_syRequests.work) {
deletePreNode(&_syRequests.prestartedRequests);
deletePreNode(&_syRequests.preconnectedRequests);
}
#if defined(_MSC_VER)
CloseHandle(_lock);
#else
pthread_mutex_destroy(&_lock);
#endif
LOG_DEBUG("ConnectedPool(%p) destructing done", this);
}
#if defined(_MSC_VER)
unsigned __stdcall ConnectedPool::loopConnectedPoolEventCallback(LPVOID arg) {
#else
void *ConnectedPool::loopConnectedPoolEventCallback(void *arg) {
#endif
ConnectedPool *eventParam = static_cast<ConnectedPool *>(arg);
#if defined(__ANDROID__) || defined(__linux__)
sigset_t signal_mask;
if (-1 == sigemptyset(&signal_mask)) {
LOG_ERROR("sigemptyset failed.");
exit(1);
}
if (-1 == sigaddset(&signal_mask, SIGPIPE)) {
LOG_ERROR("sigaddset failed.");
exit(1);
}
if (pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) != 0) {
LOG_ERROR("pthread_sigmask failed.");
exit(1);
}
prctl(PR_SET_NAME, "connectedPoolEventThread");
#endif
LOG_DEBUG("ConnectedPool(%p) create loopConnectedPoolEventCallback.", arg);
if (eventParam->_poolWorkBase) {
LOG_DEBUG("ConnectedPool(%p) event_base_dispatch ...", arg);
event_base_dispatch(eventParam->_poolWorkBase);
}
if (eventParam->_poolWorkBase) {
LOG_DEBUG("ConnectedPool(%p) event_base_free ...", arg);
event_base_free(eventParam->_poolWorkBase);
eventParam->_poolWorkBase = NULL;
}
eventParam->_poolWorkThreadId = 0;
LOG_DEBUG("ConnectedPool(%p) loopConnectedPoolEventCallback exit.", arg);
#if defined(_MSC_VER)
return Success;
#else
return NULL;
#endif
}
void ConnectedPool::connectPoolEventCallback(evutil_socket_t socketFd,
short event, void *arg) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_a, timewait_end = 0;
ConnectedPool *pool = static_cast<ConnectedPool *>(arg);
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(pool->_lock, pool);
timewait_a = utility::TextUtils::GetTimestampMs();
#else
ConnectedPool *pool = static_cast<ConnectedPool *>(arg);
MUTEX_LOCK_WITH_TAG(pool->_lock, pool);
#endif
LOG_DEBUG("Pool(%p) connectPoolEventCallback checking every pre-node ...",
pool);
int releaseCount = 0;
if (event == EV_CLOSED) {
} else {
// event == EV_TIMEOUT
if (pool->_fssRequests.work) {
releaseCount +=
pool->timeoutPrestartedNode(&pool->_fssRequests.prestartedRequests);
releaseCount += pool->timeoutPreconnectedNode(
&pool->_fssRequests.preconnectedRequests);
}
if (pool->_srRequests.work) {
releaseCount +=
pool->timeoutPrestartedNode(&pool->_srRequests.prestartedRequests);
releaseCount += pool->timeoutPreconnectedNode(
&pool->_srRequests.preconnectedRequests);
}
if (pool->_stRequests.work) {
releaseCount +=
pool->timeoutPrestartedNode(&pool->_stRequests.prestartedRequests);
releaseCount += pool->timeoutPreconnectedNode(
&pool->_stRequests.preconnectedRequests);
}
if (pool->_syRequests.work) {
releaseCount +=
pool->timeoutPrestartedNode(&pool->_syRequests.prestartedRequests);
// pool->showEveryNode(&pool->_syRequests.prestartedRequests,
// "syPrestarted after timeout");
releaseCount += pool->timeoutPreconnectedNode(
&pool->_syRequests.preconnectedRequests);
// pool->showEveryNode(&pool->_syRequests.preconnectedRequests,
// "syPreconnected after timeout");
}
}
MUTEX_UNLOCK_WITH_TAG(pool->_lock, pool);
evtimer_add(pool->_connectPoolEvent, &pool->_connectPoolTimerTv);
if (releaseCount > 0) {
event_active(pool->_nodeReleaseEvent, EV_READ, 0);
}
#ifdef ENABLE_NLS_DEBUG_2
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN(
"Pool(%p) connectPoolEventCallback done with excessive time:%llums, "
"lock:%llums, work:%llums",
pool, timewait_end - timewait_start, timewait_a - timewait_start,
timewait_end - timewait_a);
} else {
LOG_DEBUG("Pool(%p) connectPoolEventCallback done", pool);
}
#else
LOG_DEBUG("Pool(%p) connectPoolEventCallback done", pool);
#endif
return;
}
void ConnectedPool::nodeReleaseEventCallback(evutil_socket_t socketFd,
short event, void *arg) {
ConnectedPool *pool = static_cast<ConnectedPool *>(arg);
LOG_DEBUG(
"Pool(%p) nodeReleaseEventCallback ready to delete or preconnect ...",
pool);
if (event == EV_READ) {
if (pool->_fssRequests.work) {
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_fssRequests.prestartedRequests, "fssPrestarted");
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_fssRequests.preconnectedRequests, "fssPreconnected");
}
if (pool->_srRequests.work) {
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_srRequests.prestartedRequests, "srPrestarted");
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_srRequests.preconnectedRequests, "srPreconnected");
}
if (pool->_stRequests.work) {
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_stRequests.prestartedRequests, "stPrestarted");
// pool->showEveryNode(&pool->_stRequests.prestartedRequests,
// "stPrestarted");
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_stRequests.preconnectedRequests, "stPreconnected");
// pool->showEveryNode(&pool->_stRequests.preconnectedRequests,
// "stPreconnected");
}
if (pool->_syRequests.work) {
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_syRequests.prestartedRequests, "syPrestarted");
pool->deleteOrPreconnectNodeShouldReleased(
&pool->_syRequests.preconnectedRequests, "syPreconnected");
}
}
LOG_DEBUG("Pool(%p) nodeReleaseEventCallback done", pool);
}
int ConnectedPool::startConnectedPool() {
// uint64_t timewait_start, timewait_end = 0;
// timewait_start = utility::TextUtils::GetTimestampMs();
// MUTEX_LOCK(_lock);
// timewait_end = utility::TextUtils::GetTimestampMs();
// if (timewait_end - timewait_start > 50) {
// LOG_WARN(
// "ConnectedPool(%p) startConnectedPool lock with excessive time
// %llums.", this, timewait_end - timewait_start);
// } else {
// LOG_DEBUG("ConnectedPool(%p) startConnectedPool ...", this);
// }
// MUTEX_UNLOCK(_lock);
return Success;
}
int ConnectedPool::stopConnectedPool() {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK(_lock);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN(
"ConnectedPool(%p) stopConnectedPool lock with excessive time %llums.",
this, timewait_end - timewait_start);
} else {
LOG_DEBUG("ConnectedPool(%p) stopConnectedPool ...", this);
}
#else
MUTEX_LOCK(_lock);
LOG_DEBUG("ConnectedPool(%p) stopConnectedPool ...", this);
#endif
if (_connectPoolEvent != NULL) {
if (_connectPoolTimerFlag) {
evtimer_del(_connectPoolEvent);
_connectPoolTimerFlag = false;
} else {
LOG_DEBUG("ConnectedPool(%p) PoolEvent isnot in working ...", this);
}
event_free(_connectPoolEvent);
_connectPoolEvent = NULL;
}
if (_nodeReleaseEvent) {
event_del(_nodeReleaseEvent);
event_free(_nodeReleaseEvent);
_nodeReleaseEvent = NULL;
}
LOG_INFO("ConnectedPool(%p) break evbase(%p) ...", this, _poolWorkBase);
event_base_loopbreak(_poolWorkBase);
return Success;
}
bool ConnectedPool::popPrestartedNode(INlsRequest *request, NlsType type) {
if (request == NULL) {
return false;
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN(
"ConnectedPool(%p) popPrestartedNode Request(%p) lock with excessive "
"time %llums.",
this, request, timewait_end - timewait_start);
} else {
LOG_DEBUG("ConnectedPool(%p) popPrestartedNode Request(%p) ...", this,
request);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
LOG_DEBUG("ConnectedPool(%p) popPrestartedNode Request(%p) ...", this,
request);
#endif
int ret = Success;
// 0. 当前池子没有待工作的节点, 则做准备
int prestarted = 0;
int preconnected = 0;
ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
if (prestarted == 0 || preconnected == 0) {
// 0.1 填充preconnectedRequests和prestartedRequests
ret = initThisNodesPool(type);
LOG_DEBUG(
"ConnectedPool(%p) popPrestartedNode initThisNodesPool done with "
"request(%p). prestarted is %d, preconnected is %d.",
this, request, prestarted, preconnected);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
} else {
int count = getNumberOfPrestartedNodes(type);
LOG_DEBUG(
"ConnectedPool(%p) popPrestartedNode, request(%p), prestarted is %d, "
"count of PrestartedNodes is %d.",
this, request, prestarted, count);
if (prestarted > 0 && count > 0) {
// 1. 存在started的工作节点, 获取此节点
if (popOnePrestartedNode(request, type)) {
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
}
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
bool ConnectedPool::popPreconnectedNode(INlsRequest *request, NlsType type) {
if (request == NULL) {
return false;
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
int ret = Success;
// 0. 当前池子没有待工作的节点, 则做准备
int prestarted = 0;
int preconnected = 0;
ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
if (prestarted == 0 || preconnected == 0) {
// 0.1 填充preconnectedRequests和prestartedRequests
ret = initThisNodesPool(type);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
} else {
if (preconnected > 0 && getNumberOfPreconnectedNodes(type) > 0) {
// 2. 存在connected的工作节点, 获取此节点
if (popOnePreconnectedNode(request, type)) {
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
}
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
bool ConnectedPool::pushPreconnectedNode(INlsRequest *request, NlsType type,
bool newNode) {
if (request == NULL) {
return false;
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
int index = request->getConnectNode()->getPoolIndex();
LOG_DEBUG(
"ConnectedPool(%p) input %s request(%p) ndoe(%p) "
"with "
"index(%d) and "
"type[(%d)"
"0:Asr,1:SpeechTranscriber,2:TTS,3:StreamInputTts] ...",
this, newNode ? "new" : "old", request, request->getConnectNode(), index,
type);
int ret = Success;
// 0. 当前池子没有待工作的节点, 则做准备
int prestarted = 0;
int preconnected = 0;
ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
if (prestarted == 0 || preconnected == 0) {
// 0.1 填充preconnectedRequests和prestartedRequests
ret = initThisNodesPool(type);
}
evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
struct ConnectedPoolProcess *poolProcess = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
poolProcess = &_srRequests;
curPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
poolProcess = &_stRequests;
curPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
poolProcess = &_syRequests;
curPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
poolProcess = &_fssRequests;
curPool = &_fssRequests.preconnectedRequests;
break;
default:
break;
}
if (curPool) {
if (!newNode) {
/* 查找是否已经存在, 则更新部最近一次操作时间戳.
* 且push操作表示此SSL已经用完 */
if (curPool->size() >= index) {
struct ConnectedNodeProcess &node = (*curPool)[index];
if (node.status == PreNodeConnected) {
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG(
"ConnectedPool(%p) pushPreconnectedNode request(%p) compare to "
"request(%p) ...",
this, request, node.request);
#endif
evutil_socket_t itSocketFd =
node.request->getConnectNode()->getSocketFd();
SSLconnect *itSslHandle =
node.request->getConnectNode()->getSslHandle();
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
uint64_t oldTimestamp = node.workableTimestamp;
node.workableTimestamp = request->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms;
node.canPick = false;
/* curRequest在finishPushPreNode时再置NULL */
// node.curRequest = NULL;
node.curRequestInvalid = true;
LOG_INFO(
"ConnectedPool(%p) pushPreconnectedNode input request(%p) "
"node(%p) "
"is existent in index(%d/%d), update last operation "
"timestamp(from %s to %s). SSL handle is "
"%p and SocketFd is %d.",
this, request, request->getConnectNode(), index,
curPool->size(),
utility::TextUtils::GetTimeFromMs(oldTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(node.workableTimestamp)
.c_str(),
node.sslHandle, node.socketFd);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
}
}
}
/* 若不存在, 则存储 */
for (std::vector<struct ConnectedNodeProcess>::iterator it =
curPool->begin();
it != curPool->end(); ++it) {
if (it->status == PreNodeToBeCreated && !it->shouldRelease &&
it->request == NULL) {
index = std::distance(curPool->begin(), it);
it->type = request->getRequestParam()->_mode;
it->status = PreNodeConnected;
it->workableTimestamp =
request->getConnectNode()->getNodeProcess()->last_op_timestamp_ms;
uint64_t oldTimestamp = it->workableTimestamp;
it->startTimestamp = it->workableTimestamp;
if (request->getRequestParam()->_mode == TypeTts) {
it->ttsVersion = request->getRequestParam()->getVersion();
}
it->sdkName = request->getRequestParam()->getSdkName();
it->startedResponse.clear();
it->request = request;
it->socketFd = curSocketFd;
it->sslHandle = curSslHandle;
it->canPick = false;
it->curRequest = NULL;
it->shouldRelease = false;
it->shouldPreconnect = false;
it->curRequestInvalid = false;
it->isAbnormal = false;
request->getConnectNode()->setPoolIndex(index);
poolProcess->work = true;
uint64_t tokenExpirationTimestamp =
request->getRequestParam()->_tokenExpirationTime;
const int redundancyTimeDiffMs = 3600000; // 1h
if (tokenExpirationTimestamp > redundancyTimeDiffMs) {
uint64_t nowTimestamp = utility::TextUtils::GetTimestampMs();
const uint64_t diffTimeMs = 43200000; // 12h
it->tokenExpirationTimestamp =
(tokenExpirationTimestamp >= nowTimestamp + diffTimeMs)
? nowTimestamp + diffTimeMs
: (tokenExpirationTimestamp >=
nowTimestamp + redundancyTimeDiffMs
? tokenExpirationTimestamp - redundancyTimeDiffMs
: tokenExpirationTimestamp);
}
LOG_INFO(
"ConnectedPool(%p) pushPreconnectedNode input request(%p) node(%p) "
"done in index(%d/%d), start timestamp(%s), last operation "
"timestamp(%s), token expiration timestamp(%s). SSL handle "
"is "
"%p and SocketFd is %d.",
this, request, request->getConnectNode(), index, curPool->size(),
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str(),
it->sslHandle, it->socketFd);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
} // for
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
bool ConnectedPool::pushPrestartedNode(INlsRequest *request, NlsType type,
bool newNode) {
if (request == NULL) {
return false;
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t a_ms = utility::TextUtils::GetTimestampMs();
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
} else {
LOG_DEBUG("ConnectedPool(%p) pushPrestartedNode request(%p) ...", this,
request);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
int index = request->getConnectNode()->getPoolIndex();
#ifdef ENABLE_NLS_DEBUG_2
uint64_t b_ms = utility::TextUtils::GetTimestampMs();
#endif
LOG_DEBUG(
"ConnectedPool(%p) input %s request(%p) ndoe(%p) with "
"index(%d) and "
"type[(%d)"
"0:Asr,1:SpeechTranscriber,2:TTS,3:StreamInputTts] ...",
this, newNode ? "new" : "old", request, request->getConnectNode(), index,
type);
int ret = Success;
// 0. 当前池子没有待工作的节点, 则做准备
int prestarted = 0;
int preconnected = 0;
ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
if (prestarted == 0 || preconnected == 0) {
// 0.1 填充preconnectedRequests和prestartedRequests
ret = initThisNodesPool(type);
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t c_ms = utility::TextUtils::GetTimestampMs();
#endif
evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
struct ConnectedPoolProcess *poolProcess = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
poolProcess = &_srRequests;
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
poolProcess = &_stRequests;
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
poolProcess = &_syRequests;
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
poolProcess = &_fssRequests;
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
if (curPool) {
if (!newNode) {
/* 查找是否已经存在, 则更新部最近一次操作时间戳.
* 且push操作表示此SSL已经用完 */
if (curPool->size() >= index) {
struct ConnectedNodeProcess &node = (*curPool)[index];
if (node.status == PreNodeStarted) {
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG(
"ConnectedPool(%p) pushPrestartedNode request(%p) compare to "
"request(%p) ...",
this, request, node.request);
#endif
evutil_socket_t itSocketFd =
node.request->getConnectNode()->getSocketFd();
SSLconnect *itSslHandle =
node.request->getConnectNode()->getSslHandle();
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
uint64_t oldTimestamp = node.workableTimestamp;
node.workableTimestamp = request->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms;
node.canPick = false;
/* curRequest在finishPushPreNode时再置NULL */
// node.curRequest = NULL;
node.curRequestInvalid = true;
#ifdef ENABLE_NLS_DEBUG_2
uint64_t d_ms = utility::TextUtils::GetTimestampMs();
LOG_DEBUG(
"ConnectedPool(%p) pushPrestartedNode latency request(%p) "
"lock:%llu init:%llu exist:%llu, index:%d",
this, request, b_ms - a_ms, c_ms - b_ms, d_ms - c_ms, index);
#endif
LOG_INFO(
"ConnectedPool(%p) pushPrestartedNode input request(%p) "
"node(%p) "
"is existent in index(%d/%d), update last operation "
"timestamp(from %s to %s). SSL handle is "
"%p and SocketFd is %d.",
this, request, request->getConnectNode(), index,
curPool->size(),
utility::TextUtils::GetTimeFromMs(oldTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(node.workableTimestamp)
.c_str(),
node.sslHandle, node.socketFd);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
}
}
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t d_ms = utility::TextUtils::GetTimestampMs();
#endif
/* 若不存在, 则存储 */
for (std::vector<struct ConnectedNodeProcess>::iterator it =
curPool->begin();
it != curPool->end(); ++it) {
if (it->status == PreNodeToBeCreated && !it->shouldRelease &&
it->request == NULL) {
index = std::distance(curPool->begin(), it);
it->type = request->getRequestParam()->_mode;
it->status = PreNodeStarted;
it->workableTimestamp =
request->getConnectNode()->getNodeProcess()->last_op_timestamp_ms;
it->startTimestamp = it->workableTimestamp;
if (request->getRequestParam()->_mode == TypeTts) {
it->ttsVersion = request->getRequestParam()->getVersion();
}
it->sdkName = request->getRequestParam()->getSdkName();
it->startedResponse.clear();
it->request = request;
it->socketFd = curSocketFd;
it->sslHandle = curSslHandle;
it->canPick = false;
it->curRequest = NULL;
it->shouldRelease = false;
it->shouldPreconnect = false;
it->curRequestInvalid = false;
it->isAbnormal = false;
request->getConnectNode()->setPoolIndex(index);
poolProcess->work = true;
uint64_t tokenExpirationTimestamp =
request->getRequestParam()->_tokenExpirationTime;
const int redundancyTimeDiffMs = 3600000; // 1h
if (tokenExpirationTimestamp > redundancyTimeDiffMs) {
uint64_t nowTimestamp = utility::TextUtils::GetTimestampMs();
const uint64_t diffTimeMs = 43200000; // 12h
it->tokenExpirationTimestamp =
(tokenExpirationTimestamp >= nowTimestamp + diffTimeMs)
? nowTimestamp + diffTimeMs
: (tokenExpirationTimestamp >=
nowTimestamp + redundancyTimeDiffMs
? tokenExpirationTimestamp - redundancyTimeDiffMs
: tokenExpirationTimestamp);
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t e_ms = utility::TextUtils::GetTimestampMs();
LOG_DEBUG(
"ConnectedPool(%p) pushPrestartedNode latency request(%p) "
"lock:%llu init:%llu exist:%llu inexist:%llu",
this, request, b_ms - a_ms, c_ms - b_ms, d_ms - c_ms, e_ms - d_ms);
#endif
LOG_INFO(
"ConnectedPool(%p) pushPrestartedNode input request(%p) node(%p) "
"done in index(%d/%d), start timestamp(%s), last operation "
"timestamp(%s), token expiration timestamp(%s). SSL handle "
"is "
"%p and SocketFd is %d.",
this, request, request->getConnectNode(), index, curPool->size(),
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str(),
it->sslHandle, it->socketFd);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
} // for
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t e_ms = utility::TextUtils::GetTimestampMs();
LOG_DEBUG(
"ConnectedPool(%p) pushPrestartedNode latency request(%p) "
"lock:%llu init:%llu work:%llu",
this, request, b_ms - a_ms, c_ms - b_ms, e_ms - c_ms);
#endif
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
bool ConnectedPool::pushPrestartedNodeFromPreconnected(INlsRequest *request,
NlsType type) {
if (request == NULL) {
return false;
}
MUTEX_LOCK_WITH_TAG(_lock, request);
int index = request->getConnectNode()->getPoolIndex();
if (index < 0) {
LOG_WARN(
"ConnectedPool(%p) input "
"request(%p) node(%p) "
"failed with index:%d.",
this, request, request->getConnectNode(), index);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
LOG_DEBUG(
"ConnectedPool(%p) input request(%p) "
"ndoe(%p) with "
"index(%d) and "
"type[(%d)"
"0:Asr,1:SpeechTranscriber,2:TTS,3:StreamInputTts] ...",
this, request, request->getConnectNode(), index, type);
int ret = Success;
evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
struct ConnectedPoolProcess *poolProcess = NULL;
std::vector<struct ConnectedNodeProcess> *curPool0 = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
poolProcess = &_srRequests;
curPool0 = &_srRequests.preconnectedRequests;
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
poolProcess = &_stRequests;
curPool0 = &_stRequests.preconnectedRequests;
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
poolProcess = &_syRequests;
curPool0 = &_syRequests.preconnectedRequests;
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
poolProcess = &_fssRequests;
curPool0 = &_fssRequests.preconnectedRequests;
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
bool result = false;
if (curPool0 && curPool) {
/* 查找是否已经存在 */
if (curPool0->size() >= index) {
struct ConnectedNodeProcess &node = (*curPool0)[index];
if (node.status == PreNodeConnected) {
evutil_socket_t itSocketFd =
node.request->getConnectNode()->getSocketFd();
SSLconnect *itSslHandle =
node.request->getConnectNode()->getSslHandle();
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
for (std::vector<struct ConnectedNodeProcess>::iterator it =
curPool->begin();
it != curPool->end(); ++it) {
if (it->status == PreNodeToBeCreated && !it->shouldRelease &&
it->request == NULL) {
index = std::distance(curPool->begin(), it);
it->type = node.request->getRequestParam()->_mode;
it->status = PreNodeStarted;
it->workableTimestamp = node.request->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms;
it->startTimestamp = it->workableTimestamp;
if (node.request->getRequestParam()->_mode == TypeTts) {
it->ttsVersion = node.request->getRequestParam()->getVersion();
}
it->sdkName = request->getRequestParam()->getSdkName();
it->startedResponse.clear();
it->request = node.request;
it->socketFd = curSocketFd;
it->sslHandle = curSslHandle;
it->canPick = false;
it->curRequest = NULL;
node.request->getConnectNode()->setPoolIndex(index);
poolProcess->work = true;
uint64_t tokenExpirationTimestamp =
node.request->getRequestParam()->_tokenExpirationTime;
const int redundancyTimeDiffMs = 3600000; // 1h
if (tokenExpirationTimestamp > redundancyTimeDiffMs) {
uint64_t nowTimestamp = utility::TextUtils::GetTimestampMs();
const uint64_t diffTimeMs = 43200000; // 12h
it->tokenExpirationTimestamp =
(tokenExpirationTimestamp >= nowTimestamp + diffTimeMs)
? nowTimestamp + diffTimeMs
: (tokenExpirationTimestamp >=
nowTimestamp + redundancyTimeDiffMs
? tokenExpirationTimestamp - redundancyTimeDiffMs
: tokenExpirationTimestamp);
}
LOG_INFO(
"ConnectedPool(%p) pushPrestartedNodeFromPreconnected input "
"request(%p) node(%p) "
"done in index(%d/%d), start timestamp(%s), last operation "
"timestamp(%s), token expiration timestamp(%s). SSL handle "
"is "
"%p and SocketFd is %d.",
this, request, request->getConnectNode(), index,
curPool->size(),
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->workableTimestamp)
.c_str(),
utility::TextUtils::GetTimeFromMs(
it->tokenExpirationTimestamp)
.c_str(),
it->sslHandle, it->socketFd);
result = true;
break;
}
} // for
// empty this node
node.clearNode();
// node.status = PreNodeToBeCreated;
// node.startTimestamp = 0;
// node.workableTimestamp = 0;
// node.tokenExpirationTimestamp = 0;
// node.socketFd = INVALID_SOCKET;
// node.sslHandle = NULL;
// node.ttsVersion = 0;
// node.sdkName.clear();
// node.startedResponse.clear();
// node.request = NULL;
// node.curRequest = NULL;
// node.canPick = false;
// node.isAbnormal = false;
// node.shouldRelease = false;
// node.shouldPreconnect = false;
// node.curRequestInvalid = false;
} // find
}
}
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return result;
}
bool ConnectedPool::sslBelongToPool(INlsRequest *request, NlsType type,
bool &oriRequestIsAbnormal,
bool &requestInPool) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
} else {
LOG_DEBUG("ConnectedPool(%p) Request(%p) with lock(%p) sslBelongToPool ...",
this, request, &_lock);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
int preSize = getNumberOfPrestartedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
evutil_socket_t itSocketFd = it->socketFd;
SSLconnect *itSslHandle = it->sslHandle;
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
oriRequestIsAbnormal = it->isAbnormal;
if (request == it->request) {
requestInPool = true;
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
}
} // for
} // curPool
}
preSize = getNumberOfPreconnectedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
break;
default:
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
evutil_socket_t itSocketFd = it->socketFd;
SSLconnect *itSslHandle = it->sslHandle;
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
oriRequestIsAbnormal = it->isAbnormal;
if (request == it->request) {
requestInPool = true;
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
}
}
} // for
} // curPool
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
void ConnectedPool::curRequestIsAbnormal(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
std::list<int> *curList = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
int preSize = getNumberOfPrestartedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
curList = &_srRequests.prestartedIndexList;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
curList = &_stRequests.prestartedIndexList;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
curList = &_syRequests.prestartedIndexList;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
curList = &_fssRequests.prestartedIndexList;
break;
default:
curPool = NULL;
curList = NULL;
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
evutil_socket_t itSocketFd = it->socketFd;
SSLconnect *itSslHandle = it->sslHandle;
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
it->isAbnormal = true;
int index = std::distance(curPool->begin(), it);
removeElement(*curList, index);
LOG_INFO(
"ConnectedPool(%p) curRequestIsAbnormal Request(%p) SSL(%p) "
"SocketFd(%d) Index(%d) is abnormal.",
this, request, curSslHandle, curSocketFd, index);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return;
}
}
} // for
} // curPool
}
preSize = getNumberOfPreconnectedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
curList = &_srRequests.preconnectedIndexList;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
curList = &_stRequests.preconnectedIndexList;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
curList = &_syRequests.preconnectedIndexList;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
curList = &_fssRequests.preconnectedIndexList;
break;
default:
curPool = NULL;
curList = NULL;
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
evutil_socket_t itSocketFd = it->socketFd;
SSLconnect *itSslHandle = it->sslHandle;
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
it->isAbnormal = true;
int index = std::distance(curPool->begin(), it);
removeElement(*curList, index);
LOG_INFO(
"ConnectedPool(%p) curRequestIsAbnormal Request(%p) SSL(%p) "
"SocketFd(%d) Index(%d) is abnormal.",
this, request, curSslHandle, curSocketFd, index);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return;
}
}
} // for
} // curPool
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
LOG_DEBUG("ConnectedPool(%p) Request(%p) curRequestIsAbnormal done", this,
request);
}
void ConnectedPool::finishPushPreNode(NlsType type, evutil_socket_t curSocketFd,
SSLconnect *curSslHandle, int index,
INlsRequest *request) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
} else {
LOG_DEBUG(
"ConnectedPool(%p) input request(%p) with lock(%p), SSL "
"handle "
"is "
"%p and SocketFd is %d. Index is %d.",
this, request, &_lock, curSslHandle, curSocketFd, index);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
LOG_DEBUG(
"ConnectedPool(%p) input request(%p) with lock(%p), SSL "
"handle "
"is "
"%p and SocketFd is %d. Index is %d.",
this, request, &_lock, curSslHandle, curSocketFd, index);
#endif
int findIndex = index >= 0 ? index : 0;
std::list<int> *curList = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
int preSize = getNumberOfPrestartedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
curList = &_srRequests.prestartedIndexList;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
curList = &_stRequests.prestartedIndexList;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
curList = &_syRequests.prestartedIndexList;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
curList = &_fssRequests.prestartedIndexList;
break;
default:
curPool = NULL;
curList = NULL;
break;
}
#ifdef ENABLE_NLS_DEBUG_2
uint64_t a_ms = utility::TextUtils::GetTimestampMs();
#endif
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin() + findIndex; it != curPool->end(); ++it) {
// LOG_DEBUG(
// "ConnectedPool(%p) finishPushPreNode index(%d) Request(%p), it
// SSL " "handle " "is "
// "%p and SocketFd is %d. status is %d. cur SSL handle is %p and "
// "SocketFd is %d.",
// this, std::distance(curPool->begin(), it), it->request,
// it->sslHandle, it->socketFd, it->status, curSslHandle,
// curSocketFd);
if (it->status == PreNodeStarted) {
evutil_socket_t itSocketFd = it->socketFd;
SSLconnect *itSslHandle = it->sslHandle;
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
int curIndex = std::distance(curPool->begin(), it);
#ifdef ENABLE_NLS_DEBUG_2
uint64_t b_ms = utility::TextUtils::GetTimestampMs();
LOG_DEBUG(
"ConnectedPool(%p) finishPushPreNode latency PreNodeStarted "
"request(%p) "
"finish:%llu",
this, it->request, b_ms - a_ms);
#endif
LOG_INFO(
"ConnectedPool(%p) request(%p) node(%p) [curRequest(%p)] can "
"pick. "
"SSL handle is %p and SocketFd is %d. Index is %d.",
this, it->request, it->request->getConnectNode(),
it->curRequest, it->sslHandle, it->socketFd, curIndex);
it->canPick = true;
it->curRequest = NULL;
it->curRequestInvalid = false;
insertListInOrder(*curList, curIndex);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return;
}
}
} // for
} // curPool
} else {
// LOG_ERROR("ConnectedPool(%p) Request(%p) cannot get prestarted pool!",
// this,
// request);
}
preSize = getNumberOfPreconnectedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
curList = &_srRequests.preconnectedIndexList;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
curList = &_stRequests.preconnectedIndexList;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
curList = &_syRequests.preconnectedIndexList;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
curList = &_fssRequests.preconnectedIndexList;
break;
default:
curPool = NULL;
curList = NULL;
break;
}
if (curPool) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t c_ms = utility::TextUtils::GetTimestampMs();
#endif
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin() + findIndex; it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
evutil_socket_t itSocketFd = it->socketFd;
SSLconnect *itSslHandle = it->sslHandle;
if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
int curIndex = std::distance(curPool->begin(), it);
LOG_INFO(
"ConnectedPool(%p) request(%p) node(%p) [curRequest(%p)] can "
"pick. "
"SSL handle is %p and SocketFd is %d. Index is %d.",
this, it->request, it->request->getConnectNode(),
it->curRequest, it->sslHandle, it->socketFd, curIndex);
it->canPick = true;
it->curRequest = NULL;
it->curRequestInvalid = false;
insertListInOrder(*curList, curIndex);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return;
}
}
} // for
#ifdef ENABLE_NLS_DEBUG_2
uint64_t d_ms = utility::TextUtils::GetTimestampMs();
LOG_DEBUG(
"ConnectedPool(%p) finishPushPreNode latency PreNodeConnected "
"finish:%llu",
this, d_ms - c_ms);
#endif
} // curPool
} else {
// LOG_ERROR("ConnectedPool(%p) Request(%p) cannot get preconnected pool!",
// this, request);
}
LOG_ERROR("ConnectedPool(%p) Request(%p) finishPushPreNode occur exception!",
this, request);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return;
}
bool ConnectedPool::requestInPool(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
int preSize = getNumberOfPrestartedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
if (request == it->request &&
request->getConnectNode() == it->request->getConnectNode()) {
LOG_DEBUG(
"find prestarted node of request(%p) and node(%p) in "
"size(%d).",
request, request->getConnectNode(), preSize);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
#ifdef ENABLE_NLS_DEBUG_2
} else {
LOG_DEBUG(
"request(%p) is not prestarted request(%p) node(%p) with "
"index(%d).",
request, it->request, it->request->getConnectNode(),
std::distance(curPool->begin(), it));
#endif
}
}
} // for
}
}
preSize = getNumberOfPreconnectedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
break;
default:
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
if (request == it->request &&
request->getConnectNode() == it->request->getConnectNode()) {
LOG_DEBUG(
"find preconnected node of request(%p) and node(%p) in "
"size(%d).",
request, request->getConnectNode(), preSize);
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
#ifdef ENABLE_NLS_DEBUG_2
} else {
LOG_DEBUG("request(%p) is not preconnected request(%p) node(%p)",
request, it->request, it->request->getConnectNode());
#endif
}
}
} // for
}
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
bool ConnectedPool::deletePreNodeByRequest(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, request);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
this, request, timewait_end - timewait_start);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, request);
#endif
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
int preSize = getNumberOfPrestartedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
if (request == it->request &&
request->getConnectNode() == it->request->getConnectNode()) {
LOG_DEBUG(
"find prestarted node of request(%p) and node(%p) in "
"size(%d) with SSL handler(%p), and remove from pool.",
request, request->getConnectNode(), preSize, it->sslHandle);
// empty this node
it->clearNode();
// it->status = PreNodeToBeCreated;
// it->startTimestamp = 0;
// it->workableTimestamp = 0;
// it->socketFd = INVALID_SOCKET;
// it->sslHandle = NULL;
// it->ttsVersion = 0;
// it->startedResponse.clear();
// it->request = NULL;
// it->curRequest = NULL;
// it->canPick = false;
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
#ifdef ENABLE_NLS_DEBUG_2
} else {
LOG_DEBUG(
"request(%p) is not prestarted request(%p) node(%p) with "
"index(%d).",
request, it->request, it->request->getConnectNode(),
std::distance(curPool->begin(), it));
#endif
}
}
} // for
}
}
preSize = getNumberOfPreconnectedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
break;
default:
break;
}
if (curPool) {
bool equalFlag = false;
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
if (request == it->request &&
request->getConnectNode() == it->request->getConnectNode()) {
LOG_DEBUG(
"find preconnected node of request(%p) and node(%p) in "
"size(%d) with SSL handler(%p), and remove from pool.",
request, request->getConnectNode(), preSize, it->sslHandle);
// empty this node
it->clearNode();
// it->status = PreNodeToBeCreated;
// it->startTimestamp = 0;
// it->workableTimestamp = 0;
// it->socketFd = INVALID_SOCKET;
// it->sslHandle = NULL;
// it->ttsVersion = 0;
// it->startedResponse = "";
// it->request = NULL;
// it->curRequest = NULL;
// it->canPick = false;
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return true;
#ifdef ENABLE_NLS_DEBUG_2
} else {
LOG_DEBUG("request(%p) is not preconnected request(%p) node(%p)",
request, it->request, it->request->getConnectNode());
#endif
}
}
} // for
}
}
MUTEX_UNLOCK_WITH_TAG(_lock, request);
return false;
}
bool ConnectedPool::deletePreNodeBySSL(SSLconnect *curSslHandle, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
uint64_t timewait_start, timewait_end = 0;
timewait_start = utility::TextUtils::GetTimestampMs();
MUTEX_LOCK_WITH_TAG(_lock, curSslHandle);
timewait_end = utility::TextUtils::GetTimestampMs();
if (timewait_end - timewait_start > 50) {
LOG_WARN("ConnectedPool(%p) SSL(%p) lock with excessive time %llums.", this,
curSslHandle, timewait_end - timewait_start);
}
#else
MUTEX_LOCK_WITH_TAG(_lock, curSslHandle);
#endif
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
int preSize = getNumberOfPrestartedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
if (curPool) {
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
if (it->sslHandle == curSslHandle) {
LOG_DEBUG(
"find prestarted SSL(%p) in "
"size(%d), and remove request(%p).",
curSslHandle, preSize, it->request);
// empty this node
if (it->request) {
delete it->request;
it->request = NULL;
}
it->clearNode();
// it->status = PreNodeToBeCreated;
// it->startTimestamp = 0;
// it->workableTimestamp = 0;
// it->socketFd = INVALID_SOCKET;
// it->sslHandle = NULL;
// it->ttsVersion = 0;
// it->startedResponse.clear();
// it->curRequest = NULL;
// it->canPick = false;
// it->isAbnormal = false;
MUTEX_UNLOCK_WITH_TAG(_lock, curSslHandle);
return true;
}
}
} // for
}
}
preSize = getNumberOfPreconnectedNodes(type);
if (preSize > 0) {
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
break;
default:
curPool = NULL;
break;
}
if (curPool) {
bool equalFlag = false;
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin(); it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
if (it->sslHandle == curSslHandle) {
LOG_DEBUG(
"find preconnected SSL(%p) in "
"size(%d), and remove request(%p).",
curSslHandle, preSize, it->request);
// empty this node
if (it->request) {
delete it->request;
it->request = NULL;
}
it->clearNode();
// it->status = PreNodeToBeCreated;
// it->startTimestamp = 0;
// it->workableTimestamp = 0;
// it->socketFd = INVALID_SOCKET;
// it->sslHandle = NULL;
// it->ttsVersion = 0;
// it->startedResponse.clear();
// it->curRequest = NULL;
// it->canPick = false;
// it->isAbnormal = false;
MUTEX_UNLOCK_WITH_TAG(_lock, curSslHandle);
return true;
}
}
} // for
}
}
MUTEX_UNLOCK_WITH_TAG(_lock, curSslHandle);
return false;
}
int ConnectedPool::getNumberOfThisTypeNodes(NlsType type, int &prestarted,
int &preconnected) {
switch (type) {
case TypeAsr:
prestarted = _srRequests.prestartedRequests.size();
break;
case TypeRealTime:
prestarted = _stRequests.prestartedRequests.size();
break;
case TypeTts:
prestarted = _syRequests.prestartedRequests.size();
break;
case TypeStreamInputTts:
prestarted = _fssRequests.prestartedRequests.size();
break;
default:
prestarted = 0;
break;
}
switch (type) {
case TypeAsr:
preconnected = _srRequests.preconnectedRequests.size();
break;
case TypeRealTime:
preconnected = _stRequests.preconnectedRequests.size();
break;
case TypeTts:
preconnected = _syRequests.preconnectedRequests.size();
break;
case TypeStreamInputTts:
preconnected = _fssRequests.preconnectedRequests.size();
break;
default:
preconnected = 0;
break;
}
return Success;
}
int ConnectedPool::getNumberOfPreconnectedNodes(NlsType type) {
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
break;
default:
break;
}
int count = 0;
if (curPool) {
size_t size = curPool->size();
for (std::vector<struct ConnectedNodeProcess>::iterator it =
curPool->begin();
it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
count++;
}
} // for
}
return count;
}
int ConnectedPool::getNumberOfPrestartedNodes(NlsType type) {
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
break;
default:
break;
}
int count = 0;
if (curPool) {
size_t size = curPool->size();
for (std::vector<struct ConnectedNodeProcess>::iterator it =
curPool->begin();
it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
count++;
}
} // for
}
return count;
}
int ConnectedPool::initThisNodesPool(NlsType type) {
// LOG_DEBUG("ConnectedPool(%p) initThisNodesPool ...", this);
std::vector<struct ConnectedNodeProcess> *curPrestartedPool = NULL;
std::vector<struct ConnectedNodeProcess> *curPreconnectedPool = NULL;
switch (type) {
case TypeAsr:
curPrestartedPool = &_srRequests.prestartedRequests;
curPreconnectedPool = &_srRequests.preconnectedRequests;
break;
case TypeRealTime:
curPrestartedPool = &_stRequests.prestartedRequests;
curPreconnectedPool = &_stRequests.preconnectedRequests;
break;
case TypeTts:
curPrestartedPool = &_syRequests.prestartedRequests;
curPreconnectedPool = &_syRequests.preconnectedRequests;
break;
case TypeStreamInputTts:
curPrestartedPool = &_fssRequests.prestartedRequests;
curPreconnectedPool = &_fssRequests.preconnectedRequests;
break;
default:
break;
}
if (curPrestartedPool) {
size_t size = curPrestartedPool->size();
for (size_t i = size; i < _maxPreconnectedNumber; ++i) {
struct ConnectedNodeProcess tmp;
tmp.type = type;
tmp.status = PreNodeToBeCreated;
curPrestartedPool->push_back(tmp);
} // for
}
if (curPreconnectedPool) {
size_t size = curPreconnectedPool->size();
for (size_t i = size; i < _maxPreconnectedNumber; ++i) {
struct ConnectedNodeProcess tmp;
tmp.type = type;
tmp.status = PreNodeToBeCreated;
curPreconnectedPool->push_back(tmp);
} // for
}
return Success;
}
bool ConnectedPool::popOnePreconnectedNode(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG(
"ConnectedPool(%p) popOnePreconnectedNode request(%p) with type(%d) "
"...",
this, request, type);
#endif
std::list<int> *curList = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
curPool = &_srRequests.preconnectedRequests;
curList = &_srRequests.prestartedIndexList;
break;
case TypeRealTime:
curPool = &_stRequests.preconnectedRequests;
curList = &_stRequests.preconnectedIndexList;
break;
case TypeTts:
curPool = &_syRequests.preconnectedRequests;
curList = &_syRequests.preconnectedIndexList;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.preconnectedRequests;
curList = &_fssRequests.preconnectedIndexList;
break;
default:
curPool = NULL;
curList = NULL;
break;
}
if (curPool) {
int popIndex = popListFront(*curList);
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin() + popIndex; it != curPool->end(); ++it) {
if (it->status == PreNodeConnected) {
if (it->canPick) { /*SSL处于空闲, 可取走SSL*/
bool equalFlag = false; /* 判断request参数是否相同 */
INlsRequestParam *paramsInRequest = request->getRequestParam();
INlsRequestParam *paramsInPool = it->request->getRequestParam();
if (paramsInRequest && paramsInPool) {
switch (type) {
case TypeAsr:
equalFlag = *paramsInPool == *paramsInRequest &&
it->sdkName == paramsInRequest->getSdkName();
break;
case TypeRealTime:
equalFlag = *paramsInPool == *paramsInRequest &&
it->sdkName == paramsInRequest->getSdkName();
break;
case TypeTts:
equalFlag = *paramsInPool == *paramsInRequest &&
it->sdkName == paramsInRequest->getSdkName() &&
it->ttsVersion == paramsInRequest->getVersion();
break;
case TypeStreamInputTts:
equalFlag = *paramsInPool == *paramsInRequest &&
it->sdkName == paramsInRequest->getSdkName();
break;
default:
break;
}
} else {
LOG_ERROR(
"ConnectedPool(%p) input invalid request(%p) params(%p) and "
"item request(%p) params(%p).",
this, request, paramsInRequest, it->request, paramsInPool);
}
if (equalFlag) { /* request参数相同*/
int index = std::distance(curPool->begin(), it);
// fill node
SSLconnect *oldSSL = request->getConnectNode()->getSslHandle();
evutil_socket_t oldSocketFd =
request->getConnectNode()->getSocketFd();
delete oldSSL;
evutil_closesocket(oldSocketFd);
urlAddress *dstUrlAddress =
request->getConnectNode()->getUrlAddressPointer();
urlAddress *srcUrlAddress =
it->request->getConnectNode()->getUrlAddressPointer();
request->getConnectNode()->setSslHandle(it->sslHandle);
request->getConnectNode()->setSocketFd(it->socketFd);
memcpy(dstUrlAddress, srcUrlAddress, sizeof(struct urlAddress));
it->canPick = false;
it->curRequest = request;
it->curRequestInvalid = false;
request->getConnectNode()->setPoolIndex(index);
removeElement(*curList, index);
LOG_INFO(
"ConnectedPool(%p) popOnePreconnectedNode request(%p) "
"node(%p) "
"with "
"type(%d) index(%d/%d) done, reset SSL handle %p to %p and "
"reset "
"SocketFd %d "
"to %d.",
this, request, request->getConnectNode(), type,
std::distance(curPool->begin(), it), curPool->size(), oldSSL,
request->getConnectNode()->getSslHandle(), oldSocketFd,
request->getConnectNode()->getSocketFd());
return true;
} // equalFlag
} else {
// LOG_DEBUG(
// "ConnectedPool(%p) popOnePreconnectedNode request(%p) node(%p)
// " "is not ready ...", this, request,
// request->getConnectNode());
}
}
} // for
} // curPool
return false;
}
bool ConnectedPool::popOnePrestartedNode(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG(
"ConnectedPool(%p) popOnePrestartedNode request(%p) with type(%d) "
"...",
this, request, type);
#endif
std::list<int> *curList = NULL;
std::vector<struct ConnectedNodeProcess> *curPool = NULL;
switch (type) {
case TypeAsr:
curPool = &_srRequests.prestartedRequests;
curList = &_srRequests.prestartedIndexList;
break;
case TypeRealTime:
curPool = &_stRequests.prestartedRequests;
curList = &_stRequests.prestartedIndexList;
break;
case TypeTts:
curPool = &_syRequests.prestartedRequests;
curList = &_syRequests.prestartedIndexList;
break;
case TypeStreamInputTts:
curPool = &_fssRequests.prestartedRequests;
curList = &_fssRequests.prestartedIndexList;
break;
default:
curPool = NULL;
curList = NULL;
break;
}
if (curPool) {
int popIndex = popListFront(*curList);
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = curPool->begin() + popIndex; it != curPool->end(); ++it) {
if (it->status == PreNodeStarted) {
if (it->canPick) { /*SSL处于空闲, 可取走SSL*/
bool equalFlag = false; /* 判断request参数是否相同 */
INlsRequestParam *paramsInPool = it->request->getRequestParam();
INlsRequestParam *paramsInRequest = request->getRequestParam();
if (paramsInRequest && paramsInPool) {
switch (type) {
case TypeAsr:
equalFlag =
*paramsInPool == *paramsInRequest &&
paramsInPool->getSdkName() == paramsInRequest->getSdkName();
break;
case TypeRealTime:
equalFlag =
*paramsInPool == *paramsInRequest &&
paramsInPool->getSdkName() == paramsInRequest->getSdkName();
break;
case TypeTts:
equalFlag =
*paramsInPool == *paramsInRequest &&
paramsInPool->getSdkName() ==
paramsInRequest->getSdkName() &&
paramsInPool->getVersion() == paramsInRequest->getVersion();
break;
case TypeStreamInputTts:
equalFlag =
*paramsInPool == *paramsInRequest &&
paramsInPool->getSdkName() == paramsInRequest->getSdkName();
break;
default:
break;
}
} else {
LOG_ERROR(
"ConnectedPool(%p) input invalid request(%p) params(%p) and "
"item request(%p) params(%p).",
this, request, paramsInRequest, it->request, paramsInPool);
}
if (equalFlag) { /* request参数相同*/
int index = std::distance(curPool->begin(), it);
// fill node
SSLconnect *oldSSL = request->getConnectNode()->getSslHandle();
evutil_socket_t oldSocketFd =
request->getConnectNode()->getSocketFd();
delete oldSSL;
evutil_closesocket(oldSocketFd);
urlAddress *dstUrlAddress =
request->getConnectNode()->getUrlAddressPointer();
urlAddress *srcUrlAddress =
it->request->getConnectNode()->getUrlAddressPointer();
request->getConnectNode()->setSslHandle(it->sslHandle);
request->getConnectNode()->setSocketFd(it->socketFd);
memcpy(dstUrlAddress, srcUrlAddress, sizeof(struct urlAddress));
it->canPick = false;
it->curRequest = request;
it->curRequestInvalid = false;
request->getConnectNode()->setPoolIndex(index);
removeElement(*curList, index);
LOG_INFO(
"ConnectedPool(%p) popOnePrestartedNode request(%p) node(%p) "
"with "
"type(%d) index(%d/%d) done, reset SSL handle %p to %p and "
"reset "
"SocketFd %d "
"to %d.",
this, request, request->getConnectNode(), type, index,
curPool->size(), oldSSL,
request->getConnectNode()->getSslHandle(), oldSocketFd,
request->getConnectNode()->getSocketFd());
return true;
} // equalFlag
} // canPick
} else {
// LOG_DEBUG(
// "ConnectedPool(%p) popOnePrestartedNode(index:%d) request(%p) "
// "node(%p) with SSL handle(%p) and socketFd(%d) "
// "is not ready, cannot pick ...",
// this, std::distance(curPool->begin(), it), request,
// request->getConnectNode(),
// request->getConnectNode()->getSslHandle(),
// request->getConnectNode()->getSocketFd());
}
} // for
} // curPool
return false;
}
void ConnectedPool::deletePreNode(
std::vector<struct ConnectedNodeProcess> *pool) {
if (pool) {
for (std::vector<struct ConnectedNodeProcess>::iterator it = pool->begin();
it != pool->end(); ++it) {
if (it->request) {
delete it->request;
it->request = NULL;
}
it->socketFd = INVALID_SOCKET;
it->sslHandle = NULL;
it->curRequest = NULL;
it->canPick = false;
} // for
pool->clear();
}
}
int ConnectedPool::timeoutPrestartedNode(
std::vector<struct ConnectedNodeProcess> *pool) {
int releaseCount = 0;
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = pool->begin(); it != pool->end(); ++it) {
if (it->status == PreNodeStarted) {
uint64_t curTimestamp = utility::TextUtils::GetTimestampMs();
// LOG_DEBUG(
// "it->request:%p, "
// "it->curRequest->getConnectNode():%p",
// it->request);
// update workableTimestamp from current request
if (!it->curRequestInvalid && it->curRequest &&
it->curRequest->getConnectNode() &&
it->curRequest->getConnectNode()->getNodeProcess() &&
it->curRequest->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms > 0) {
// LOG_DEBUG(
// "it->curRequest:%p, "
// "it->curRequest->getConnectNode():%p",
// it->curRequest, it->curRequest->getConnectNode());
it->workableTimestamp = it->curRequest->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms;
}
uint64_t gapTimestamp = curTimestamp - it->workableTimestamp;
bool timeout = gapTimestamp >= _prerequestedTimeoutMs;
bool tokenTimeout = it->tokenExpirationTimestamp > 0 &&
curTimestamp >= it->tokenExpirationTimestamp;
if (timeout || tokenTimeout) {
if (it->curRequest == NULL) {
LOG_WARN(
"Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
"prestarted "
"request(%p) SSL handler(%p) index(%d) "
"because of %llums. start timestamp is %llu(%s), last "
"operate timestamp is %llu(%s), and token expiration timestamp "
"is %llu(%s).",
this, pool, pool->begin(), pool->end(), it->request,
it->sslHandle, std::distance(pool->begin(), it), gapTimestamp,
it->startTimestamp,
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
it->workableTimestamp,
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
it->tokenExpirationTimestamp,
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str());
// empty this node, delete request later
it->status = PreNodeToBeCreated;
it->startTimestamp = 0;
it->workableTimestamp = 0;
it->tokenExpirationTimestamp = 0;
it->socketFd = INVALID_SOCKET;
it->sslHandle = NULL;
it->ttsVersion = 0;
it->sdkName.clear();
it->startedResponse.clear();
it->canPick = false;
it->curRequestInvalid = false;
it->shouldRelease = true;
if (!tokenTimeout) {
it->shouldPreconnect = true;
}
releaseCount++;
} else {
LOG_WARN(
"Pool(%p:(%p)%p-%p) connectPoolEventCallback prestarted "
"request(%p) curRequest(%p) idle "
"timeout:%llums. start timestamp is %llu(%s). But request(%p) is "
"using, check later ...",
this, pool, pool->begin(), pool->end(), it->request,
it->curRequest, gapTimestamp, it->startTimestamp,
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
it->curRequest);
}
} else {
// valid connection
int pingResult = Success;
if (it->request && it->request->getConnectNode()) {
pingResult = it->request->getConnectNode()->syncPingCmd();
}
if (pingResult != Success) {
LOG_WARN(
"Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
"prestarted "
"request(%p) SSL handler(%p) index(%d) "
"because of ping failed. start timestamp is %llu(%s), last "
"operate timestamp is %llu(%s), and token expiration timestamp "
"is %llu(%s).",
this, pool, pool->begin(), pool->end(), it->request,
it->sslHandle, std::distance(pool->begin(), it),
it->startTimestamp,
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
it->workableTimestamp,
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
it->tokenExpirationTimestamp,
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str());
// empty this node, delete request later
it->status = PreNodeToBeCreated;
it->startTimestamp = 0;
it->workableTimestamp = 0;
it->socketFd = INVALID_SOCKET;
it->sslHandle = NULL;
it->ttsVersion = 0;
it->sdkName.clear();
it->startedResponse.clear();
it->canPick = false;
it->shouldPreconnect = false;
it->curRequestInvalid = false;
it->shouldRelease = true;
releaseCount++;
}
// LOG_DEBUG(
// "Pool(%p) connectPoolEventCallback "
// "request(%p) %llu - %llu = %llu.",
// this, it->request, curTimestamp, it->workableTimestamp,
// gapTimestamp);
}
}
} // for
return releaseCount;
}
int ConnectedPool::timeoutPreconnectedNode(
std::vector<struct ConnectedNodeProcess> *pool) {
int releaseCount = 0;
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = pool->begin(); it != pool->end(); ++it) {
if (it->status == PreNodeConnected) {
uint64_t curTimestamp = utility::TextUtils::GetTimestampMs();
// update workableTimestamp from current request
if (!it->curRequestInvalid && it->curRequest &&
it->curRequest->getConnectNode() &&
it->curRequest->getConnectNode()->getNodeProcess() &&
it->curRequest->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms > 0) {
it->workableTimestamp = it->curRequest->getConnectNode()
->getNodeProcess()
->last_op_timestamp_ms;
}
uint64_t gapTimestamp = curTimestamp - it->workableTimestamp;
bool timeout = gapTimestamp >= _preconnectedTimeoutMs;
bool tokenTimeout = it->tokenExpirationTimestamp > 0 &&
curTimestamp >= it->tokenExpirationTimestamp;
if (timeout || tokenTimeout) {
if (it->curRequest == NULL) {
LOG_WARN(
"Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
"preconnected "
"request(%p) SSL handler(%p) index(%d) "
"because of %llums. start timestamp is %llu(%s), last "
"operate timestamp is %llu(%s), and token expiration timestamp "
"is %llu(%s).",
this, pool, pool->begin(), pool->end(), it->request,
it->sslHandle, std::distance(pool->begin(), it), gapTimestamp,
it->startTimestamp,
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
it->workableTimestamp,
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
it->tokenExpirationTimestamp,
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str());
// empty this node, delete request later
it->status = PreNodeToBeCreated;
it->startTimestamp = 0;
it->workableTimestamp = 0;
it->socketFd = INVALID_SOCKET;
it->sslHandle = NULL;
it->ttsVersion = 0;
it->startedResponse.clear();
it->canPick = false;
it->shouldRelease = true;
if (!tokenTimeout) {
it->shouldPreconnect = true;
}
it->curRequestInvalid = false;
releaseCount++;
} else {
LOG_WARN(
"Pool(%p:(%p)%p-%p) connectPoolEventCallback preconnected "
"request(%p) curRequest(%p) idle "
"timeout:%llums. start timestamp is %llu(%s). But request(%p) is "
"using, check later ...",
this, pool, pool->begin(), pool->end(), it->request,
it->curRequest, gapTimestamp, it->startTimestamp,
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
it->curRequest);
}
} else {
// valid connection
int pingResult = Success;
if (it->request && it->request->getConnectNode()) {
pingResult = it->request->getConnectNode()->syncPingCmd();
}
if (pingResult != Success) {
LOG_WARN(
"Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
"preconnected "
"request(%p) SSL handler(%p) index(%d) "
"because of ping failed(%d). start timestamp is %llu(%s), last "
"operate timestamp is %llu(%s), and token expiration timestamp "
"is %llu(%s).",
this, pool, pool->begin(), pool->end(), it->request,
it->sslHandle, std::distance(pool->begin(), it), pingResult,
it->startTimestamp,
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
it->workableTimestamp,
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
it->tokenExpirationTimestamp,
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str());
// empty this node, delete request later
it->status = PreNodeToBeCreated;
it->startTimestamp = 0;
it->workableTimestamp = 0;
it->tokenExpirationTimestamp = 0;
it->socketFd = INVALID_SOCKET;
it->sslHandle = NULL;
it->ttsVersion = 0;
it->sdkName.clear();
it->startedResponse.clear();
it->canPick = false;
it->shouldPreconnect = false;
it->curRequestInvalid = false;
it->shouldRelease = true;
releaseCount++;
}
// LOG_DEBUG(
// "Pool(%p) connectPoolEventCallback "
// "request(%p) %llu - %llu = %llu.",
// this, it->request, curTimestamp, it->workableTimestamp,
// gapTimestamp);
}
}
} // for
return releaseCount;
}
void ConnectedPool::deleteOrPreconnectNodeShouldReleased(
std::vector<struct ConnectedNodeProcess> *pool, std::string name) {
// LOG_DEBUG("Pool(%p:(%p)) Name(%s) begin ...", this, pool, name.c_str());
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = pool->begin(); it != pool->end(); ++it) {
// LOG_DEBUG(
// "Pool(%p:(%p)) Name(%s) Request(%p) index(%d) shouldRelease(%s) "
// "shouldPreconnect(%s) ...",
// this, pool, name.c_str(), it->request, std::distance(pool->begin(),
// it), it->shouldRelease ? "true" : "false", it->shouldPreconnect ?
// "true" : "false");
if (it->shouldRelease) {
it->shouldRelease = false;
if (it->shouldPreconnect) {
LOG_INFO(
"Request(%p) %s index(%d) needs to preconnect now and then delete "
"...",
it->request, name.c_str(), std::distance(pool->begin(), it));
it->shouldPreconnect = false;
preconnectNodeByRequest(it->request); // it->request is old request
// LOG_DEBUG("Request(%p) push into pool finish.", it->request);
} else {
LOG_INFO(
"Request(%p) %s index(%d) needs to be deleted now because the "
"timeout "
"...",
it->request, name.c_str(), std::distance(pool->begin(), it));
}
// delete old request
if (it->request) {
bool release_lock_ret = true;
NlsClientImpl *cur_instance =
it->request->getConnectNode()->getInstance();
if (cur_instance != NULL) {
MUTEX_TRY_LOCK_WITH_TAG(cur_instance->_mtxReleaseRequestGuard, 2000,
release_lock_ret, it->request);
if (!release_lock_ret) {
LOG_ERROR("Request(%p) lock destroy failed, deadlock has occurred",
it->request);
}
} else {
LOG_ERROR("Request(%p) just only created ...", it->request);
release_lock_ret = false;
}
delete it->request;
if (release_lock_ret) {
MUTEX_UNLOCK_WITH_TAG(cur_instance->_mtxReleaseRequestGuard,
it->request);
}
}
// empty this node
it->clearNode();
// it->status = PreNodeToBeCreated;
// it->startTimestamp = 0;
// it->workableTimestamp = 0;
// it->socketFd = INVALID_SOCKET;
// it->sslHandle = NULL;
// it->ttsVersion = 0;
// it->startedResponse = "";
// it->curRequest = NULL;
// it->canPick = false;
// it->request = NULL;
// it->isAbnormal = false;
} // shouldRelease is true
} // for
// LOG_DEBUG("Pool(%p:(%p)) Name(%s) done.", this, pool, name.c_str());
}
void ConnectedPool::preconnectNodeByRequest(INlsRequest *request) {
if (request) {
INlsRequest *newRequest = NULL;
INlsRequestParam *requestParam = request->getRequestParam();
ConnectNode *node = request->getConnectNode();
NlsType type = requestParam->_mode;
LOG_DEBUG(
"ConnectedPool(%p) preconnectNodeByRequest old request(%p) with "
"type:%d.",
this, request, type);
switch (type) {
case TypeAsr:
newRequest = NlsClient::getInstance()->createRecognizerRequest(
requestParam->getSdkName().c_str(), node->isLongConnection());
break;
case TypeRealTime:
newRequest = NlsClient::getInstance()->createTranscriberRequest(
requestParam->getSdkName().c_str(), node->isLongConnection());
break;
case TypeTts:
newRequest = NlsClient::getInstance()->createSynthesizerRequest(
(TtsVersion)requestParam->getVersion(),
requestParam->getSdkName().c_str(), node->isLongConnection());
break;
case TypeStreamInputTts:
newRequest = NlsClient::getInstance()->createFlowingSynthesizerRequest(
requestParam->getSdkName().c_str(), node->isLongConnection());
break;
default:
break;
} // switch
if (newRequest) {
LOG_INFO(
"ConnectedPool(%p) create new request(%p) from old request(%p) for "
"PreconnectedPool.",
this, newRequest, request);
*(newRequest->getRequestParam()) = *(request->getRequestParam());
newRequest->getConnectNode()->usePreNodeStartStepByStep(true);
int ret = NlsEventNetWork::_eventClient->startInner(newRequest);
bool result = false;
if (ret == Success) {
result = newRequest->getConnectNode()->directLinkIpFromCache();
}
if (result) {
result = pushPreconnectedNode(newRequest, type, true);
if (result) {
finishPushPreNode(type, newRequest->getConnectNode()->getSocketFd(),
newRequest->getConnectNode()->getSslHandle(),
newRequest->getConnectNode()->getPoolIndex(),
newRequest);
} else {
deletePreNodeBySSL(newRequest->getConnectNode()->getSslHandle(),
type);
}
} else {
switch (type) {
case TypeAsr:
NlsClient::getInstance()->releaseRecognizerRequest(
(SpeechRecognizerRequest *)newRequest);
break;
case TypeRealTime:
NlsClient::getInstance()->releaseTranscriberRequest(
(SpeechTranscriberRequest *)newRequest);
break;
case TypeTts:
NlsClient::getInstance()->releaseSynthesizerRequest(
(SpeechSynthesizerRequest *)newRequest);
break;
case TypeStreamInputTts:
NlsClient::getInstance()->releaseFlowingSynthesizerRequest(
(FlowingSynthesizerRequest *)newRequest);
break;
default:
break;
} // switch
}
}
} else {
LOG_ERROR("ConnectedPool(%p) preconnectNodeByRequest request is null.",
this, request);
}
}
void ConnectedPool::showEveryNode(
std::vector<struct ConnectedNodeProcess> *pool, std::string name) {
LOG_DEBUG("==>> ConnectedPool(%p:(%p)%p-%p) show every node in pool %s ...",
this, pool, pool->begin(), pool->end(), name.c_str());
LOG_DEBUG(" %s =>", name.c_str());
std::vector<struct ConnectedNodeProcess>::iterator it;
for (it = pool->begin(); it != pool->end(); ++it) {
LOG_DEBUG(" index(%d) status:%s", std::distance(pool->begin(), it),
getStatusStr(it->status).c_str());
LOG_DEBUG(" request:%p curRequest:%p", it->request, it->curRequest);
if (it->request) {
LOG_DEBUG(" node:%p", it->request->getConnectNode());
}
LOG_DEBUG(" socketFd:%d sslHandle:%p", it->socketFd, it->sslHandle);
if (it->request && it->request->getConnectNode()) {
LOG_DEBUG(" node socketFd:%d sslHandle:%p",
it->request->getConnectNode()->getSocketFd(),
it->request->getConnectNode()->getSslHandle());
}
LOG_DEBUG(
" shouldRelease:%s shouldPreconnect:%s isAbnormal:%s "
"canPick:%s curRequestInvalid:%s",
it->shouldRelease ? "true" : "false",
it->shouldPreconnect ? "true" : "false",
it->isAbnormal ? "true" : "false", it->canPick ? "true" : "false",
it->curRequestInvalid ? "true" : "false");
LOG_DEBUG(" start:%s workable:%s tokenExpiration:%s",
utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
.c_str());
}
LOG_DEBUG("<<== ConnectedPool(%p) show every node done", this);
}
std::string ConnectedPool::getStatusStr(ConnectedStatus status) {
std::string result = "unknown";
switch (status) {
case PreNodeInvalid:
result = "PreNodeInvalid";
break;
case PreNodeToBeCreated:
result = "PreNodeToBeCreated";
break;
case PreNodeCreated:
result = "PreNodeCreated";
break;
case PreNodeConnected:
result = "PreNodeConnected";
break;
case PreNodeStarted:
result = "PreNodeStarted";
break;
default:
break;
}
return result;
}
void ConnectedPool::insertListInOrder(std::list<int> &lst, int a) {
std::list<int>::iterator it = std::lower_bound(lst.begin(), lst.end(), a);
lst.insert(it, a);
}
void ConnectedPool::removeElement(std::list<int> &lst, int a) { lst.remove(a); }
int ConnectedPool::popListFront(std::list<int> &lst) {
int index = 0;
if (!lst.empty()) {
index = lst.front();
lst.pop_front();
}
return index;
}
} // namespace AlibabaNls