OS_THREAD_ROUTINE connection_status_thread()

in libftl/handshake.c [433:537]


OS_THREAD_ROUTINE connection_status_thread(void *data)
{
    ftl_stream_configuration_private_t *ftl = (ftl_stream_configuration_private_t *)data;
    char buf[1024];
    ftl_status_msg_t status;
    struct timeval last_ping, now;
    int64_t ms_since_ping = 0;

    // We ping every 5 seconds, but don't timeout the connection until 30 seconds has passed
    // without hearing anything back from the ingest. This time is high, but some some poor networks
    // this can happen.
    int keepalive_is_late = 6 * KEEPALIVE_FREQUENCY_MS; 

    gettimeofday(&last_ping, NULL);

    // Loop while the connection status thread should be alive.
    while (ftl_get_state(ftl, FTL_CXN_STATUS_THRD)) {

        // Wait on the shutdown event for at most STATUS_THREAD_SLEEP_TIME_MS
        os_semaphore_pend(&ftl->connection_thread_shutdown, STATUS_THREAD_SLEEP_TIME_MS);
        if (!ftl_get_state(ftl, FTL_CXN_STATUS_THRD))
        {
            break;
        }

        ftl_status_t error_code = FTL_SUCCESS;

        // Check if there is any data for us to consume.
        unsigned long bytesAvailable = 0;
        int ret = get_socket_bytes_available(ftl->ingest_socket, &bytesAvailable);
        if (ret < 0)
        {
            FTL_LOG(ftl, FTL_LOG_ERROR, "Failed to call get_socket_bytes_available, %s", get_socket_error());
            error_code = FTL_UNKNOWN_ERROR_CODE;
        }
        else
        {
            // If we have data waiting, consume it now.
            if (bytesAvailable > 0)
            {
                int resp_code = _ftl_get_response(ftl, buf, sizeof(buf));

                // If we got a ping response, mark the time and loop again.
                if (resp_code  == FTL_INGEST_RESP_PING) {
                    gettimeofday(&last_ping, NULL);
                    continue;
                }

                // If it's anything else, it's an error.
                error_code = _log_response(ftl, resp_code);
            }
        }

        // If we don't have an error, check if the ping has timed out.
        if (error_code == FTL_SUCCESS)
        {
            // Get the current time and figure out the time since the last ping was recieved.
            gettimeofday(&now, NULL);
            ms_since_ping = timeval_subtract_to_ms(&now, &last_ping);
            if (ms_since_ping < keepalive_is_late) {
                continue;
            }

            // Otherwise, we havn't gotten the ping in too long.
            FTL_LOG(ftl, FTL_LOG_ERROR, "Ingest ping timeout, we haven't gotten a ping in %d ms.", ms_since_ping);
            error_code = FTL_NO_PING_RESPONSE;
        }

        // At this point something is wrong, and we are going to shutdown the connection. Do one more check that we
        // should still be running.
        if (!ftl_get_state(ftl, FTL_CXN_STATUS_THRD))
        {
            break;
        }
        FTL_LOG(ftl, FTL_LOG_ERROR, "Ingest connection has dropped: error code %d\n", error_code);

        // Clear the state that this thread is running. If we don't do this we will dead lock
        // in the internal_ingest_disconnect.
        ftl_clear_state(ftl, FTL_CXN_STATUS_THRD);

        // Shutdown the ingest connection.
        if (os_trylock_mutex(&ftl->disconnect_mutex)) {
            internal_ingest_disconnect(ftl);
            os_unlock_mutex(&ftl->disconnect_mutex);
        }

        // Fire an event indicating we shutdown.
        status.type = FTL_STATUS_EVENT;
        if (error_code == FTL_NO_MEDIA_TIMEOUT) {
            status.msg.event.reason = FTL_STATUS_EVENT_REASON_NO_MEDIA;
        }
        else {
            status.msg.event.reason = FTL_STATUS_EVENT_REASON_UNKNOWN;
        }
        status.msg.event.type = FTL_STATUS_EVENT_TYPE_DISCONNECTED;
        status.msg.event.error_code = error_code;
        enqueue_status_msg(ftl, &status);

        // Exit the loop.
        break;		
    }

    FTL_LOG(ftl, FTL_LOG_INFO, "Exited connection_status_thread");
    return 0;
}