in libftl/handshake.c [433:537]
OS_THREAD_ROUTINE connection_status_thread(void *data)
{
ftl_stream_configuration_private_t *ftl = (ftl_stream_configuration_private_t *)data;
char buf[1024];
ftl_status_msg_t status;
struct timeval last_ping, now;
int64_t ms_since_ping = 0;
// We ping every 5 seconds, but don't timeout the connection until 30 seconds has passed
// without hearing anything back from the ingest. This time is high, but some some poor networks
// this can happen.
int keepalive_is_late = 6 * KEEPALIVE_FREQUENCY_MS;
gettimeofday(&last_ping, NULL);
// Loop while the connection status thread should be alive.
while (ftl_get_state(ftl, FTL_CXN_STATUS_THRD)) {
// Wait on the shutdown event for at most STATUS_THREAD_SLEEP_TIME_MS
os_semaphore_pend(&ftl->connection_thread_shutdown, STATUS_THREAD_SLEEP_TIME_MS);
if (!ftl_get_state(ftl, FTL_CXN_STATUS_THRD))
{
break;
}
ftl_status_t error_code = FTL_SUCCESS;
// Check if there is any data for us to consume.
unsigned long bytesAvailable = 0;
int ret = get_socket_bytes_available(ftl->ingest_socket, &bytesAvailable);
if (ret < 0)
{
FTL_LOG(ftl, FTL_LOG_ERROR, "Failed to call get_socket_bytes_available, %s", get_socket_error());
error_code = FTL_UNKNOWN_ERROR_CODE;
}
else
{
// If we have data waiting, consume it now.
if (bytesAvailable > 0)
{
int resp_code = _ftl_get_response(ftl, buf, sizeof(buf));
// If we got a ping response, mark the time and loop again.
if (resp_code == FTL_INGEST_RESP_PING) {
gettimeofday(&last_ping, NULL);
continue;
}
// If it's anything else, it's an error.
error_code = _log_response(ftl, resp_code);
}
}
// If we don't have an error, check if the ping has timed out.
if (error_code == FTL_SUCCESS)
{
// Get the current time and figure out the time since the last ping was recieved.
gettimeofday(&now, NULL);
ms_since_ping = timeval_subtract_to_ms(&now, &last_ping);
if (ms_since_ping < keepalive_is_late) {
continue;
}
// Otherwise, we havn't gotten the ping in too long.
FTL_LOG(ftl, FTL_LOG_ERROR, "Ingest ping timeout, we haven't gotten a ping in %d ms.", ms_since_ping);
error_code = FTL_NO_PING_RESPONSE;
}
// At this point something is wrong, and we are going to shutdown the connection. Do one more check that we
// should still be running.
if (!ftl_get_state(ftl, FTL_CXN_STATUS_THRD))
{
break;
}
FTL_LOG(ftl, FTL_LOG_ERROR, "Ingest connection has dropped: error code %d\n", error_code);
// Clear the state that this thread is running. If we don't do this we will dead lock
// in the internal_ingest_disconnect.
ftl_clear_state(ftl, FTL_CXN_STATUS_THRD);
// Shutdown the ingest connection.
if (os_trylock_mutex(&ftl->disconnect_mutex)) {
internal_ingest_disconnect(ftl);
os_unlock_mutex(&ftl->disconnect_mutex);
}
// Fire an event indicating we shutdown.
status.type = FTL_STATUS_EVENT;
if (error_code == FTL_NO_MEDIA_TIMEOUT) {
status.msg.event.reason = FTL_STATUS_EVENT_REASON_NO_MEDIA;
}
else {
status.msg.event.reason = FTL_STATUS_EVENT_REASON_UNKNOWN;
}
status.msg.event.type = FTL_STATUS_EVENT_TYPE_DISCONNECTED;
status.msg.event.error_code = error_code;
enqueue_status_msg(ftl, &status);
// Exit the loop.
break;
}
FTL_LOG(ftl, FTL_LOG_INFO, "Exited connection_status_thread");
return 0;
}