in src/source/Ice/ConnectionListener.c [230:390]
PVOID connectionListenerReceiveDataRoutine(PVOID arg)
{
STATUS retStatus = STATUS_SUCCESS;
PConnectionListener pConnectionListener = (PConnectionListener) arg;
PSocketConnection pSocketConnection;
BOOL iterate = TRUE;
PSocketConnection sockets[CONNECTION_LISTENER_DEFAULT_MAX_LISTENING_CONNECTION];
UINT32 i, socketCount;
INT32 nfds = 0;
struct pollfd rfds[CONNECTION_LISTENER_DEFAULT_MAX_LISTENING_CONNECTION];
INT32 retval, localSocket;
INT64 readLen;
// the source address is put here. sockaddr_storage can hold either sockaddr_in or sockaddr_in6
struct sockaddr_storage srcAddrBuff;
socklen_t srcAddrBuffLen = SIZEOF(srcAddrBuff);
struct sockaddr_in* pIpv4Addr;
struct sockaddr_in6* pIpv6Addr;
KvsIpAddress srcAddr;
PKvsIpAddress pSrcAddr = NULL;
CHK(pConnectionListener != NULL, STATUS_NULL_ARG);
/* Ensure that memory sanitizers consider
* rfds initialized even if FD_ZERO is
* implemented in assembly. */
MEMSET(&rfds, 0x00, SIZEOF(rfds));
srcAddr.isPointToPoint = FALSE;
while (!ATOMIC_LOAD_BOOL(&pConnectionListener->terminate)) {
nfds = 0;
// Perform the socket connection gathering under the lock
// NOTE: There is no cleanup jump from the lock/unlock block
// so we don't need to use a boolean indicator whether locked
MUTEX_LOCK(pConnectionListener->lock);
for (i = 0, socketCount = 0; i < CONNECTION_LISTENER_DEFAULT_MAX_LISTENING_CONNECTION; i++) {
pSocketConnection = pConnectionListener->sockets[i];
if (pSocketConnection != NULL) {
if (!socketConnectionIsClosed(pSocketConnection)) {
MUTEX_LOCK(pSocketConnection->lock);
localSocket = pSocketConnection->localSocket;
MUTEX_UNLOCK(pSocketConnection->lock);
rfds[nfds].fd = localSocket;
rfds[nfds].events = POLLIN | POLLPRI;
rfds[nfds].revents = 0;
nfds++;
// Store the sockets locally while in use and mark it as in use
sockets[socketCount++] = pSocketConnection;
ATOMIC_STORE_BOOL(&pSocketConnection->inUse, TRUE);
} else {
// Remove the connection
pConnectionListener->sockets[i] = NULL;
pConnectionListener->socketCount--;
}
}
}
// Need to unlock the mutex to ensure other racing threads unblock
MUTEX_UNLOCK(pConnectionListener->lock);
// blocking call until resolves as a timeout, an error, a signal or data received
retval = POLL(rfds, nfds, CONNECTION_LISTENER_SOCKET_WAIT_FOR_DATA_TIMEOUT / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
// In case of 0 we have a timeout and should re-lock to allow for other
// interlocking operations to proceed. A positive return means we received data
if (retval == -1) {
DLOGW("poll() failed with errno %s", getErrorString(getErrorCode()));
} else if (retval > 0) {
for (i = 0; i < socketCount; i++) {
pSocketConnection = sockets[i];
if (!socketConnectionIsClosed(pSocketConnection)) {
MUTEX_LOCK(pSocketConnection->lock);
localSocket = pSocketConnection->localSocket;
MUTEX_UNLOCK(pSocketConnection->lock);
if (canReadFd(localSocket, rfds, nfds)) {
iterate = TRUE;
while (iterate) {
readLen = recvfrom(localSocket, pConnectionListener->pBuffer, pConnectionListener->bufferLen, 0,
(struct sockaddr*) &srcAddrBuff, &srcAddrBuffLen);
if (readLen < 0) {
switch (getErrorCode()) {
case EWOULDBLOCK:
break;
default:
/* on any other error, close connection */
CHK_STATUS(socketConnectionClosed(pSocketConnection));
DLOGD("recvfrom() failed with errno %s for socket %d", getErrorString(getErrorCode()), localSocket);
break;
}
iterate = FALSE;
} else if (readLen == 0) {
CHK_STATUS(socketConnectionClosed(pSocketConnection));
iterate = FALSE;
} else if (/* readLen > 0 */
ATOMIC_LOAD_BOOL(&pSocketConnection->receiveData) && pSocketConnection->dataAvailableCallbackFn != NULL &&
/* data could be encrypted so they need to be decrypted through socketConnectionReadData
* and get the decrypted data length. */
STATUS_SUCCEEDED(socketConnectionReadData(pSocketConnection, pConnectionListener->pBuffer,
pConnectionListener->bufferLen, (PUINT32) &readLen))) {
if (pSocketConnection->protocol == KVS_SOCKET_PROTOCOL_UDP) {
if (srcAddrBuff.ss_family == AF_INET) {
srcAddr.family = KVS_IP_FAMILY_TYPE_IPV4;
pIpv4Addr = (struct sockaddr_in*) &srcAddrBuff;
MEMCPY(srcAddr.address, (PBYTE) &pIpv4Addr->sin_addr, IPV4_ADDRESS_LENGTH);
srcAddr.port = pIpv4Addr->sin_port;
} else if (srcAddrBuff.ss_family == AF_INET6) {
srcAddr.family = KVS_IP_FAMILY_TYPE_IPV6;
pIpv6Addr = (struct sockaddr_in6*) &srcAddrBuff;
MEMCPY(srcAddr.address, (PBYTE) &pIpv6Addr->sin6_addr, IPV6_ADDRESS_LENGTH);
srcAddr.port = pIpv6Addr->sin6_port;
}
pSrcAddr = &srcAddr;
} else {
// srcAddr is ignored in TCP callback handlers
pSrcAddr = NULL;
}
// readLen may be 0 if SSL does not emit any application data.
// in that case, no need to call dataAvailable callback
if (readLen > 0) {
pSocketConnection->dataAvailableCallbackFn(pSocketConnection->dataAvailableCallbackCustomData, pSocketConnection,
pConnectionListener->pBuffer, (UINT32) readLen, pSrcAddr,
NULL); // no dest information available right now.
}
}
// reset srcAddrBuffLen to actual size
srcAddrBuffLen = SIZEOF(srcAddrBuff);
}
}
}
}
}
// Mark as unused
for (i = 0; i < socketCount; i++) {
ATOMIC_STORE_BOOL(&sockets[i]->inUse, FALSE);
}
}
CleanUp:
// The check for valid mutex is necessary because when we're in freeConnectionListener
// we may free the mutex in another thread so by the time we get here accessing the lock
// will result in accessing a resource after it has been freed
if (pConnectionListener != NULL && IS_VALID_MUTEX_VALUE(pConnectionListener->lock)) {
// As TID is 64 bit we can't atomically update it and need to do it under the lock
MUTEX_LOCK(pConnectionListener->lock);
pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE;
MUTEX_UNLOCK(pConnectionListener->lock);
}
CHK_LOG_ERR(retStatus);
return (PVOID) (ULONG_PTR) retStatus;
}