PVOID connectionListenerReceiveDataRoutine()

in src/source/Ice/ConnectionListener.c [230:390]


PVOID connectionListenerReceiveDataRoutine(PVOID arg)
{
    STATUS retStatus = STATUS_SUCCESS;
    PConnectionListener pConnectionListener = (PConnectionListener) arg;
    PSocketConnection pSocketConnection;
    BOOL iterate = TRUE;
    PSocketConnection sockets[CONNECTION_LISTENER_DEFAULT_MAX_LISTENING_CONNECTION];
    UINT32 i, socketCount;

    INT32 nfds = 0;
    struct pollfd rfds[CONNECTION_LISTENER_DEFAULT_MAX_LISTENING_CONNECTION];
    INT32 retval, localSocket;
    INT64 readLen;
    // the source address is put here. sockaddr_storage can hold either sockaddr_in or sockaddr_in6
    struct sockaddr_storage srcAddrBuff;
    socklen_t srcAddrBuffLen = SIZEOF(srcAddrBuff);
    struct sockaddr_in* pIpv4Addr;
    struct sockaddr_in6* pIpv6Addr;
    KvsIpAddress srcAddr;
    PKvsIpAddress pSrcAddr = NULL;

    CHK(pConnectionListener != NULL, STATUS_NULL_ARG);

    /* Ensure that memory sanitizers consider
     * rfds initialized even if FD_ZERO is
     * implemented in assembly. */
    MEMSET(&rfds, 0x00, SIZEOF(rfds));

    srcAddr.isPointToPoint = FALSE;

    while (!ATOMIC_LOAD_BOOL(&pConnectionListener->terminate)) {
        nfds = 0;

        // Perform the socket connection gathering under the lock
        // NOTE: There is no cleanup jump from the lock/unlock block
        // so we don't need to use a boolean indicator whether locked
        MUTEX_LOCK(pConnectionListener->lock);
        for (i = 0, socketCount = 0; i < CONNECTION_LISTENER_DEFAULT_MAX_LISTENING_CONNECTION; i++) {
            pSocketConnection = pConnectionListener->sockets[i];
            if (pSocketConnection != NULL) {
                if (!socketConnectionIsClosed(pSocketConnection)) {
                    MUTEX_LOCK(pSocketConnection->lock);
                    localSocket = pSocketConnection->localSocket;
                    MUTEX_UNLOCK(pSocketConnection->lock);
                    rfds[nfds].fd = localSocket;
                    rfds[nfds].events = POLLIN | POLLPRI;
                    rfds[nfds].revents = 0;
                    nfds++;

                    // Store the sockets locally while in use and mark it as in use
                    sockets[socketCount++] = pSocketConnection;
                    ATOMIC_STORE_BOOL(&pSocketConnection->inUse, TRUE);
                } else {
                    // Remove the connection
                    pConnectionListener->sockets[i] = NULL;
                    pConnectionListener->socketCount--;
                }
            }
        }

        // Need to unlock the mutex to ensure other racing threads unblock
        MUTEX_UNLOCK(pConnectionListener->lock);

        // blocking call until resolves as a timeout, an error, a signal or data received
        retval = POLL(rfds, nfds, CONNECTION_LISTENER_SOCKET_WAIT_FOR_DATA_TIMEOUT / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);

        // In case of 0 we have a timeout and should re-lock to allow for other
        // interlocking operations to proceed. A positive return means we received data
        if (retval == -1) {
            DLOGW("poll() failed with errno %s", getErrorString(getErrorCode()));
        } else if (retval > 0) {
            for (i = 0; i < socketCount; i++) {
                pSocketConnection = sockets[i];
                if (!socketConnectionIsClosed(pSocketConnection)) {
                    MUTEX_LOCK(pSocketConnection->lock);
                    localSocket = pSocketConnection->localSocket;
                    MUTEX_UNLOCK(pSocketConnection->lock);

                    if (canReadFd(localSocket, rfds, nfds)) {
                        iterate = TRUE;
                        while (iterate) {
                            readLen = recvfrom(localSocket, pConnectionListener->pBuffer, pConnectionListener->bufferLen, 0,
                                               (struct sockaddr*) &srcAddrBuff, &srcAddrBuffLen);
                            if (readLen < 0) {
                                switch (getErrorCode()) {
                                    case EWOULDBLOCK:
                                        break;
                                    default:
                                        /* on any other error, close connection */
                                        CHK_STATUS(socketConnectionClosed(pSocketConnection));
                                        DLOGD("recvfrom() failed with errno %s for socket %d", getErrorString(getErrorCode()), localSocket);
                                        break;
                                }

                                iterate = FALSE;
                            } else if (readLen == 0) {
                                CHK_STATUS(socketConnectionClosed(pSocketConnection));
                                iterate = FALSE;
                            } else if (/* readLen > 0 */
                                       ATOMIC_LOAD_BOOL(&pSocketConnection->receiveData) && pSocketConnection->dataAvailableCallbackFn != NULL &&
                                       /* data could be encrypted so they need to be decrypted through socketConnectionReadData
                                        * and get the decrypted data length. */
                                       STATUS_SUCCEEDED(socketConnectionReadData(pSocketConnection, pConnectionListener->pBuffer,
                                                                                 pConnectionListener->bufferLen, (PUINT32) &readLen))) {
                                if (pSocketConnection->protocol == KVS_SOCKET_PROTOCOL_UDP) {
                                    if (srcAddrBuff.ss_family == AF_INET) {
                                        srcAddr.family = KVS_IP_FAMILY_TYPE_IPV4;
                                        pIpv4Addr = (struct sockaddr_in*) &srcAddrBuff;
                                        MEMCPY(srcAddr.address, (PBYTE) &pIpv4Addr->sin_addr, IPV4_ADDRESS_LENGTH);
                                        srcAddr.port = pIpv4Addr->sin_port;
                                    } else if (srcAddrBuff.ss_family == AF_INET6) {
                                        srcAddr.family = KVS_IP_FAMILY_TYPE_IPV6;
                                        pIpv6Addr = (struct sockaddr_in6*) &srcAddrBuff;
                                        MEMCPY(srcAddr.address, (PBYTE) &pIpv6Addr->sin6_addr, IPV6_ADDRESS_LENGTH);
                                        srcAddr.port = pIpv6Addr->sin6_port;
                                    }
                                    pSrcAddr = &srcAddr;
                                } else {
                                    // srcAddr is ignored in TCP callback handlers
                                    pSrcAddr = NULL;
                                }

                                // readLen may be 0 if SSL does not emit any application data.
                                // in that case, no need to call dataAvailable callback
                                if (readLen > 0) {
                                    pSocketConnection->dataAvailableCallbackFn(pSocketConnection->dataAvailableCallbackCustomData, pSocketConnection,
                                                                               pConnectionListener->pBuffer, (UINT32) readLen, pSrcAddr,
                                                                               NULL); // no dest information available right now.
                                }
                            }

                            // reset srcAddrBuffLen to actual size
                            srcAddrBuffLen = SIZEOF(srcAddrBuff);
                        }
                    }
                }
            }
        }

        // Mark as unused
        for (i = 0; i < socketCount; i++) {
            ATOMIC_STORE_BOOL(&sockets[i]->inUse, FALSE);
        }
    }

CleanUp:

    // The check for valid mutex is necessary because when we're in freeConnectionListener
    // we may free the mutex in another thread so by the time we get here accessing the lock
    // will result in accessing a resource after it has been freed
    if (pConnectionListener != NULL && IS_VALID_MUTEX_VALUE(pConnectionListener->lock)) {
        // As TID is 64 bit we can't atomically update it and need to do it under the lock
        MUTEX_LOCK(pConnectionListener->lock);
        pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE;
        MUTEX_UNLOCK(pConnectionListener->lock);
    }

    CHK_LOG_ERR(retStatus);

    return (PVOID) (ULONG_PTR) retStatus;
}