nlsCppSdk/transport/connectNode.h (456 lines of code) (raw):

/* * Copyright 2021 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef NLS_SDK_CONNECT_NODE_H #define NLS_SDK_CONNECT_NODE_H #if defined(_MSC_VER) #include <windows.h> #else #include <arpa/inet.h> #include <netinet/in.h> #include <pthread.h> #include <sys/socket.h> #endif #include <stdint.h> #include <queue> #include <string> #include "SSLconnect.h" #include "error.h" #include "event.h" #include "event2/buffer.h" #include "event2/dns.h" #include "event2/util.h" #include "nlsClientImpl.h" #include "nlsEncoder.h" #include "nlsGlobal.h" #include "webSocketFrameHandleBase.h" #include "webSocketTcp.h" #ifdef ENABLE_PRECONNECTED_POOL #include "connectedPool.h" #endif #if defined(ENABLE_REQUEST_RECORDING) || defined(ENABLE_CONTINUED) #include "json/json.h" #endif namespace AlibabaNls { class INlsRequest; class WorkThread; class NlsEventNetWork; #if defined(_MSC_VER) #define NLS_ERR_IS_EAGAIN(e) ((e) == WSAEWOULDBLOCK || (e) == EAGAIN) #define NLS_ERR_RW_RETRIABLE(e) ((e) == WSAEWOULDBLOCK || (e) == WSAEINTR) #define NLS_ERR_CONNECT_RETRIABLE(e) \ ((e) == WSAEWOULDBLOCK || (e) == WSAEINTR || (e) == WSAEINPROGRESS || \ (e) == WSAEINVAL) #define NLS_ERR_ACCEPT_RETRIABLE(e) EVUTIL_ERR_RW_RETRIABLE(e) #define NLS_ERR_CONNECT_REFUSED(e) ((e) == WSAECONNREFUSED) #else #define INVALID_SOCKET -1 #if EAGAIN == EWOULDBLOCK #define NLS_ERR_IS_EAGAIN(e) ((e) == EAGAIN) #else #define NLS_ERR_IS_EAGAIN(e) ((e) == EAGAIN || (e) == EWOULDBLOCK) #endif /*EAGAIN == EWOULDBLOCK*/ /* True iff e is an error that means a read/write operation can be retried. */ #define NLS_ERR_RW_RETRIABLE(e) ((e) == EINTR || NLS_ERR_IS_EAGAIN(e)) /* True iff e is an error that means an connect can be retried. */ #define NLS_ERR_CONNECT_RETRIABLE(e) ((e) == EINTR || (e) == EINPROGRESS) /* True iff e is an error that means a accept can be retried. */ #define NLS_ERR_ACCEPT_RETRIABLE(e) \ ((e) == EINTR || NLS_ERR_IS_EAGAIN(e) || (e) == ECONNABORTED) /* True iff e is an error that means the connection was refused */ #define NLS_ERR_CONNECT_REFUSED(e) ((e) == ECONNREFUSED) #endif /*#if defined(_MSC_VER)*/ #define CLOSE_JSON_STRING "{\"channelClosed\": \"nls request finished.\"}" #define TASKFAILED_CONNECT_JSON_STRING "connect failed." #define TASKFAILED_PARSE_JSON_STRING \ "{\"TaskFailed\": \"JSON: Json parse failed.\"}" #define TASKFAILED_NEW_NLSEVENT_FAILED \ "{\"TaskFailed\": \"new NlsEvent failed, memory is not enough.\"}" #define TASKFAILED_UTF8_JSON_STRING "{\"TaskFailed\": \"utf8ToGbk failed.\"}" #define TASKFAILED_WS_JSON_STRING \ "{\"TaskFailed\": \"WEBSOCKET: unkown head type.\"}" #define TASKFAILED_ERROR_CLOSE_STRING \ "{\"TaskFailed\": \"WEBSOCKET: invalid closeCode of wsFrame.\"}" /* EventNetWork发送Node的具体指令 */ enum CmdType { CmdStart = 0, CmdStop, CmdStControl, CmdTextDialog, CmdExecuteDialog, CmdWarkWord, CmdCancel, CmdSendText, CmdSendPing, CmdSendFlush, }; /* Node处于的退出状态 */ enum ExitStatus { ExitInvalid = 0, /* 构造时, 未处于退出状态 */ ExitStopping, /* 调用stop时设置ExitStopping */ ExitCancel, /* 调用cancel时设置ExitCancel */ }; /* Node处于的最新运行状态 */ enum ConnectStatus { NodeInvalid = 0, /* node处于不可用或者释放状态 */ NodeCreated, /* 构造node */ NodeInvoking, /* 刚调用start的过程, 向notifyEventCallback发送c指令 */ NodeInvoked, /* 调用start的过程, 在notifyEventCallback完成 */ NodeConnecting, /* 正在dns解析 */ NodeConnected, /* socket链接成功 */ NodeHandshaking, /* ssl握手中 */ NodeHandshaked, /* 握手成功 */ NodeStarting, /* 握手后收到response, 开始工作 */ NodeStarted, /* 收到started response时 */ NodeWakeWording = 10, NodeFailed, NodeCompleted, NodeClosed, NodeReleased, NodeStop, NodeCancel, NodeSendAudio, NodeSendControl, NodePlayAudio, NodeSendText }; enum ConnectType { ConnectWithSSL, ConnectWithDirectIP, ConnectWithIpCache, ConnectWithLongConnect, ConnectWithPrestartedNodePool, ConnectWithPreconnectedNodePool, }; #ifdef ENABLE_REQUEST_RECORDING /* Node运行过程记录 */ struct NodeProcess { public: explicit NodeProcess() { create_timestamp_ms = 0; start_timestamp_ms = 0; started_timestamp_ms = 0; stop_timestamp_ms = 0; cancel_timestamp_ms = 0; first_binary_timestamp_ms = 0; last_send_timestamp_ms = 0; last_ctrl_timestamp_ms = 0; failed_timestamp_ms = 0; completed_timestamp_ms = 0; closed_timestamp_ms = 0; last_op_timestamp_ms = 0; last_status = NodeInvalid; recording_bytes = 0; send_count = 0; play_bytes = 0; play_count = 0; /* about API */ api_start_run = false; api_stop_run = false; api_cancel_run = false; api_send_run = false; api_ctrl_run = false; last_api_timestamp_ms = 0; /* about CALLBACK */ last_callback = NlsEvent::TaskFailed; last_cb_start_timestamp_ms = 0; last_cb_end_timestamp_ms = 0; last_cb_run = false; connect_type = ConnectWithSSL; }; ~NodeProcess(){}; uint64_t create_timestamp_ms; uint64_t start_timestamp_ms; uint64_t started_timestamp_ms; uint64_t stop_timestamp_ms; uint64_t cancel_timestamp_ms; uint64_t first_binary_timestamp_ms; uint64_t last_send_timestamp_ms; uint64_t last_ctrl_timestamp_ms; uint64_t failed_timestamp_ms; uint64_t completed_timestamp_ms; uint64_t closed_timestamp_ms; uint64_t last_op_timestamp_ms; ConnectStatus last_status; uint64_t recording_bytes; uint64_t send_count; uint64_t play_bytes; uint64_t play_count; /* about API */ bool api_start_run; bool api_stop_run; bool api_cancel_run; bool api_send_run; bool api_ctrl_run; uint64_t last_api_timestamp_ms; /* about CALLBACK */ NlsEvent::EventType last_callback; uint64_t last_cb_start_timestamp_ms; uint64_t last_cb_end_timestamp_ms; bool last_cb_run; ConnectType connect_type; }; #endif #ifdef ENABLE_CONTINUED /* Node运行中断自动重连的状态记录 */ struct NodeReconnection { public: enum { max_try_count = 4, reconnect_interval_ms = 100 }; enum ReconnectionState { NoReconnection = 0, // The reconnection has not been triggered WillReconnect, // Trigger reconnection, will event_add(launch) TriggerReconnection, // Triggers a new reconnection, new node launched NewReconnectionStarting, // New node is running }; explicit NodeReconnection() { state = NoReconnection; reconnected_count = 0; tw_index_offset = 0; interruption_timestamp_ms = 0; first_audio_timestamp_ms = 0; }; ~NodeReconnection(){}; ReconnectionState state; uint32_t reconnected_count; uint64_t tw_index_offset; uint64_t interruption_timestamp_ms; uint64_t first_audio_timestamp_ms; }; #endif class ConnectNode { public: ConnectNode(INlsRequest *request, HandleBaseOneParamWithReturnVoid<NlsEvent> *handler, bool isLongConnection = false); virtual ~ConnectNode(); /* 1. about pointer and status of this node */ /* 1.1. something about instance&request&eventThread of this node */ inline void setInstance(NlsClientImpl *instance) { _instance = instance; } inline NlsClientImpl *getInstance() { return _instance; } inline INlsRequest *getRequest() { return _request; } inline void setRequest(INlsRequest *request) { _request = request; } inline WorkThread *getEventThread() { return _eventThread; } inline void setEventThread(WorkThread *thread) { _eventThread = thread; } /* 1.2. get event point of launching node */ struct event *getLaunchEvent(bool init = false); #ifdef ENABLE_PRECONNECTED_POOL struct event *getStartWithPoolEvent(bool init = false); #endif struct event *getSingleRoundTextEvent(); bool _isSendSingleRoundText; /* 1.3. something about status of this node */ /* design to record work status */ ConnectStatus getConnectNodeStatus(); std::string getConnectNodeStatusString(); std::string getConnectNodeStatusStringLocked(); std::string getConnectNodeStatusString(ConnectStatus status); void setConnectNodeStatus(ConnectStatus status); /* 1.4. UUID of this node */ inline std::string getNodeUUID() { return _nodeUUID; } /* design to record exit status */ ExitStatus getExitStatus(); std::string getExitStatusString(); /* design to record wakeup status */ bool getWakeStatus(); /* take effect all setting parameters */ void updateParameters(); /* design to long connection */ inline bool isLongConnection() { return _isLongConnection; } inline bool useLongConnection(bool enable) { _isLongConnection = enable; } inline void setConnected(bool isConnected) { _isConnected = isConnected; } /* design to preconnection */ inline bool isUsingPreconnection() { return _usePreconnection; } inline void usePreconnection(bool usePreconnection) { _usePreconnection = usePreconnection; } inline bool isPreconnecting() { return _isPreconnecting; } inline void usePreNodeStartStepByStep(bool enable) { _isPreNodeStartStepByStep = enable; } inline bool isPreNodeStartStepByStep() { return _isPreNodeStartStepByStep; } void initAllStatus(); /*init all status in longConnection mode*/ /* 1.4. about error */ inline int getErrorCode() { return _nodeErrCode; }; inline const char *getErrorMsg() { return _nodeErrMsg.c_str(); }; /* 2. command operation of request */ /* 2.1. run command */ void addCmdDataBuffer(CmdType type, const char *message = NULL); int cmdNotify(CmdType type, const char *message); #ifdef ENABLE_PRECONNECTED_POOL int syncPingCmd(); #endif /* 2.2. evBuffer for command */ inline struct evbuffer *getBinaryEvBuffer() { return _binaryEvBuffer; }; inline struct evbuffer *getCmdEvBuffer() { return _cmdEvBuffer; }; inline struct evbuffer *getWwvEvBuffer() { return _wwvEvBuffer; }; /* 2.3. ev for command */ inline struct event *getReadEvent() { return _readEvent; }; inline struct event *getWriteEvent() { return _writeEvent; }; /* 3. send command and audio data */ /* 3.1. send audio data */ int addAudioDataBuffer(const uint8_t *frame, size_t length); int addSlicedAudioDataBuffer(const uint8_t *frame, size_t length); /* 3.2. parse&send request */ int sendControlDirective(); int gatewayRequest(); int nlsSendFrame(struct evbuffer *eventBuffer, bool audio_frame = false); /* 4. recv response and parse */ int gatewayResponse(); int webSocketResponse(); /* 5. something about network */ inline evutil_socket_t getSocketFd() { return _socketFd; } inline void setSocketFd(evutil_socket_t fd) { _socketFd = fd; } inline SSLconnect *getNativeSslHandle() { return _nativeSslHandle; } inline SSLconnect *getSslHandle() { return _sslHandle; } inline void setSslHandle(SSLconnect *handle) { _sslHandle = handle; } inline struct event *getConnectEvent() { return _connectEvent; } inline struct timeval *getConnectTv() { return &_connectTv; } inline urlAddress getUrlAddress() { return _url; } inline urlAddress *getUrlAddressPointer() { return &_url; } inline struct timeval *getRecvTvPointer() { return &_recvTv; } inline bool getEnableRecvTv() { return _enableRecvTv; } int dnsProcess(int aiFamily, char *directIp, bool sysGetAddr); int socketConnect(); int connectProcess(const char *ip, int aiFamily); int sslProcess(); void disconnectProcess(); #ifdef ENABLE_PRECONNECTED_POOL int prestartProcess(); int prestartEventDelProcess(); bool directLinkIpFromCache(); int syncSocketConnect(); int syncConnectProcess(const char *ip, int aiFamily); int syncSslProcess(); #endif /* 6. exit operation */ void closeConnectNode(); /* about status of node in destroy */ bool updateDestroyStatus(); /* about event of callback */ void delAllEvents(); /* 7. something about other modules */ /* init encoder about opus&opu */ void initNlsEncoder(); /* 8. design to native_getaddrinfo */ #ifdef __LINUX__ char *_nodename; char *_servname; pthread_t _dnsThread; /*异步dns方案启动线程*/ bool _dnsThreadExit; bool _dnsThreadRunning; /*异步dns方案线程已经启动*/ struct timespec _outtime; /*异步dns方案超时设置*/ struct gaicb *_gaicbRequest[1]; struct event *_dnsEvent; int _dnsErrorCode; struct evutil_addrinfo *_addrinfo; static void *async_dns_resolve_thread_fn(void *arg); #endif int _dnsRequestCallbackStatus; /* 1:开始DNS; 2:结束DNS */ /* 9. design for thread safe */ #if defined(_MSC_VER) HANDLE _mtxNode; HANDLE _mtxCloseNode; HANDLE _mtxEventCallbackNode; #else pthread_mutex_t _mtxNode; pthread_mutex_t _mtxCloseNode; pthread_mutex_t _mtxEventCallbackNode; pthread_cond_t _cvEventCallbackNode; /*释放过程中等待事件回调结束*/ #endif bool _inEventCallbackNode; /*是否处于事件回调中*/ bool _releasingFlag; /*处于释放中*/ bool _waitEventCallbackAbnormally; /*处于异常状态*/ /* 10. design for sync call */ inline void setSyncCallTimeout(unsigned int timeout_ms) { _syncCallTimeoutMs = timeout_ms; } inline unsigned int getSyncCallTimeout() { return _syncCallTimeoutMs; } void waitInvokeFinish(); /* 11. about listener */ void handlerTaskFailedEvent(std::string failedInfo, int code = DefaultErrorCode); #ifdef ENABLE_REQUEST_RECORDING /* 12. design for recording process */ void updateNodeProcess(std::string api, int status, bool enter, size_t size); const char *dumpAllInfo(); inline struct NodeProcess *getNodeProcess() { return &_nodeProcess; } std::string getConnectTypeStr(); #endif #ifdef ENABLE_CONTINUED /* 13. design for reconnection automatically */ struct event *getReconnectEvent(); struct NodeReconnection _reconnection; #endif /* 14. others */ void sendFakeSynthesisStarted(); #ifdef ENABLE_PRECONNECTED_POOL int tryToGetPreconnection(); int getPoolIndex(); void setPoolIndex(int index); #endif private: enum ConnectNodeConstValue { RetryConnectCount = 4, ConnectTimerIntervalMs = 30, SampleRate16K = 16000, Buffer8kMaxLimit = 16000, Buffer16kMaxLimit = 32000, NodeFrameSize = 2048, }; /* 1. about pointer and status of this node */ /* 1.1. something about instance&request&eventThread of this node */ NlsClientImpl *_instance; /* setting WorkThread of this node*/ WorkThread *_eventThread; /* setting request of this node*/ INlsRequest *_request; /* 1.2. event point of launching node */ struct event *_launchEvent; #ifdef ENABLE_PRECONNECTED_POOL struct event *_startWithPoolEvent; int _poolIndex; #endif struct event *_singleRoundTextEvent; /* 1.3. something about status of this node */ bool _isStop; bool _isFirstBinaryFrame; /* design to record work status */ ConnectStatus _workStatus; /* design to record exit status */ ExitStatus _exitStatus; /* design to record wakeup status */ bool _isWakeStop; /* about status of node in destroy */ bool _isDestroy; /* design to long connection */ bool _isConnected; bool _isLongConnection; /* design to preconnection */ bool _usePreconnection; /* 正在使用ConnectedPool机制 */ bool _isPreconnecting; /* 用于标记此Node进行交互并存储到ConnectedPool */ bool _isPreNodeStartStepByStep; /* 预创建的预连接节点, 分步启动 */ /* 1.4. about error */ int getErrorCodeFromMsg(const char *msg); std::string _nodeErrMsg; int _nodeErrCode; /* 1.5. about UUID */ std::string _nodeUUID; /* 2. command operation of request */ /* 2.1. run command */ std::string getCmdTypeString(int type); /* 2.2. evBuffer for command */ size_t _limitSize; struct evbuffer *_readEvBuffer; struct evbuffer *_binaryEvBuffer; struct evbuffer *_cmdEvBuffer; struct evbuffer *_wwvEvBuffer; /* 2.3. ev for command */ struct event *_connectEvent; struct event *_readEvent; struct event *_writeEvent; /* 3. send command and audio data */ /* 3.1. send audio data */ int addRemainAudioData(); /* 3.2. parse&send request */ bool parseUrlInformation(char *ip); int socketWrite(const uint8_t *buffer, size_t len); int nlsSend(const uint8_t *frame, size_t length); /* 4. recv response and parse */ int socketRead(uint8_t *buffer, size_t len); int nlsReceive(uint8_t *buffer, int max_size); NlsEvent *convertResult(WebSocketFrame *frame, int *result); int parseFrame(WebSocketFrame *wsFrame); /* 5. something about network */ bool checkConnectCount(); /* about socket connection */ urlAddress _url; evutil_socket_t _socketFd; SSLconnect *_sslHandle; // 此Node正在工作的SSL SSLconnect *_nativeSslHandle; // 此Node刚创建时的SSL, 单纯标记用, 不可释放 /* parameters about network */ int _aiFamily; struct sockaddr_in _addrV4; struct sockaddr_in6 _addrV6; WebSocketTcp _webSocket; WebSocketHeaderType _wsType; struct evdns_getaddrinfo_request *_dnsRequest; size_t _retryConnectCount; /*try count of connection*/ struct timeval _connectTv; bool _enableRecvTv; struct timeval _recvTv; struct timeval _sendTv; #ifdef ENABLE_HIGH_EFFICIENCY struct timeval _connectTimerTv; struct event *_connectTimerEvent; bool _connectTimerFlag; #endif /* 6. exit operation */ const char *genCloseMsg(std::string *buf_str); void closeStatusConnectNode(); void closeStatusConnectNodeForConnectedPool(); /* 7. something about other modules */ /* about audio data encoder */ NlsEncoder *_nlsEncoder; ENCODER_TYPE _encoderType; uint8_t *_audioFrame; int _audioFrameSize; int _maxFrameSize; bool _isFirstAudioFrame; /* 9. design for thread safe */ void waitEventCallback(); /* 10. design for sync call */ void sendFinishCondSignal(NlsEvent::EventType eventType); #if defined(_MSC_VER) HANDLE _mtxInvokeSyncCallNode; #else pthread_mutex_t _mtxInvokeSyncCallNode; pthread_cond_t _cvInvokeSyncCallNode; /*调用过程中等待调用结束*/ #endif unsigned int _syncCallTimeoutMs; /* 11. about listener */ void handlerEvent(const char *error, int errorCode, NlsEvent::EventType eventType, bool ignore = false); void handlerMessage(const char *response, NlsEvent::EventType eventType); int handlerFrame(NlsEvent *frameEvent); HandleBaseOneParamWithReturnVoid<NlsEvent> *_handler; /*callback listener*/ bool _enableOnMessage; #ifdef ENABLE_REQUEST_RECORDING /* 12. design for recording process */ std::string replenishNodeProcess(const char *error); Json::Value updateNodeProcess4Data(); Json::Value updateNodeProcess4Last(); Json::Value updateNodeProcess4Timestamp(); Json::Value updateNodeProcess4Callback(); Json::Value updateNodeProcess4Block(); struct NodeProcess _nodeProcess; #endif #ifdef ENABLE_CONTINUED /* 13. design for reconnection automatically */ Json::Value updateNodeReconnection(); void updateTwIndexOffset(NlsEvent *frameEvent); bool nodeReconnecting(); struct event *_reconnectEvent; #endif bool ignoreCallbackWhenReconnecting(NlsEvent::EventType eventType, int code); /* 14. others */ const char *genSynthesisStartedMsg(); }; } // namespace AlibabaNls #endif // NLS_SDK_CONNECT_NODE_H