in libftl/media.c [1051:1195]
OS_THREAD_ROUTINE recv_thread(void *data)
{
ftl_stream_configuration_private_t *ftl = (ftl_stream_configuration_private_t *)data;
ftl_media_config_t *media = &ftl->media;
int ret;
unsigned char *buf;
struct sockaddr_in6 ipv6_addrinfo;
struct sockaddr_in ipv4_addrinfo;
struct sockaddr* addrinfo;
socklen_t addr_len, addrinfo_len;
char remote_ip[IPVX_ADDR_ASCII_LEN];
if (ftl->socket_family == AF_INET) {
addrinfo = (struct sockaddr *)&ipv4_addrinfo;
addrinfo_len = sizeof(struct sockaddr_in);
}
else {
addrinfo = (struct sockaddr *)&ipv6_addrinfo;
addrinfo_len = sizeof(struct sockaddr_in6);
}
#ifdef _WIN32
if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL)) {
FTL_LOG(ftl, FTL_LOG_WARN, "Failed to set recv_thread priority to THREAD_PRIORITY_TIME_CRITICAL\n");
}
#endif
if ((buf = (unsigned char*)malloc(MAX_PACKET_BUFFER)) == NULL) {
FTL_LOG(ftl, FTL_LOG_ERROR, "Failed to allocate recv buffer\n");
return (OS_THREAD_TYPE)-1;
}
while (ftl_get_state(ftl, FTL_RX_THRD)) {
// Wait on the socket for data or a timeout. The timeout is how we
// exit the thread when disconnecting.
ret = poll_socket_for_receive(media->media_socket, 50);
if (ret == 0)
{
// This is a timeout, this is perfectly fine.
continue;
}
else if (ret < 0)
{
// We hit an error.
FTL_LOG(ftl, FTL_LOG_INFO, "Receive thread socket error on poll");
continue;
}
// We have data on the socket, read it.
addr_len = addrinfo_len;
ret = recvfrom(media->media_socket, buf, MAX_PACKET_BUFFER, 0, (struct sockaddr *)addrinfo, &addr_len);
if (ret <= 0) {
// This shouldn't be possible, we should only be here is poll above told us there was data.
FTL_LOG(ftl, FTL_LOG_INFO, "recv from failed with %s\n", get_socket_error());
continue;
}
if (_get_remote_ip(addrinfo, addr_len, remote_ip, sizeof(remote_ip)) < 0) {
continue;
}
if (strcmp(remote_ip, ftl->ingest_ip) != 0)
{
FTL_LOG(ftl, FTL_LOG_WARN, "Discarded packet from unexpected ip: %s\n", remote_ip);
continue;
}
int version, padding, feedbackType, ptype, length, ssrcSender, ssrcMedia;
uint16_t snBase, blp, sn;
int recv_len = ret;
if (recv_len < 2) {
FTL_LOG(ftl, FTL_LOG_WARN, "recv packet too small to parse, discarding\n");
continue;
}
/*extract rtp header*/
version = (buf[0] >> 6) & 0x3;
padding = (buf[0] >> 5) & 0x1;
feedbackType = buf[0] & 0x1F;
ptype = buf[1];
if (feedbackType == 1 && ptype == 205) {
length = ntohs(*((uint16_t*)(buf + 2)));
if (recv_len < ((length + 1) * 4)) {
FTL_LOG(ftl, FTL_LOG_WARN, "reported len was %d but packet is only %d...discarding\n", recv_len, ((length + 1) * 4));
continue;
}
ssrcSender = ntohl(*((uint32_t*)(buf + 4)));
ssrcMedia = ntohl(*((uint32_t*)(buf + 8)));
uint16_t *p = (uint16_t *)(buf + 12);
int fci;
for (fci = 0; fci < (length - 2); fci++) {
//request the first sequence number
snBase = ntohs(*p++);
_nack_resend_packet(ftl, ssrcMedia, snBase);
blp = ntohs(*p++);
if (blp) {
int i;
for (i = 0; i < 16; i++) {
if ((blp & (1 << i)) != 0) {
sn = snBase + i + 1;
_nack_resend_packet(ftl, ssrcMedia, sn);
}
}
}
}
}
else if (feedbackType == 1 && ptype == PING_PTYPE) {
ping_pkt_t *ping = (ping_pkt_t *)buf;
struct timeval now;
int delay_ms;
media_stats_t *pkt_stats = &ftl->video.media_component.stats;
gettimeofday(&now, NULL);
delay_ms = (int)timeval_subtract_to_ms(&now, &ping->xmit_time);
if (delay_ms > pkt_stats->pkt_rtt_max) {
pkt_stats->pkt_rtt_max = delay_ms;
}
else if (delay_ms < pkt_stats->pkt_rtt_min) {
pkt_stats->pkt_rtt_min = delay_ms;
}
pkt_stats->total_rtt += delay_ms;
pkt_stats->rtt_samples++;
ftl->media.last_rtt_delay = delay_ms;
}
}
free(buf);
FTL_LOG(ftl, FTL_LOG_INFO, "Exited Recv Thread\n");
return (OS_THREAD_TYPE)0;
}