OS_THREAD_ROUTINE recv_thread()

in libftl/media.c [1051:1195]


OS_THREAD_ROUTINE recv_thread(void *data)
{
  ftl_stream_configuration_private_t *ftl = (ftl_stream_configuration_private_t *)data;
  ftl_media_config_t *media = &ftl->media;
  int ret;
  unsigned char *buf;
  struct sockaddr_in6 ipv6_addrinfo;
  struct sockaddr_in ipv4_addrinfo;
  struct sockaddr* addrinfo;
  socklen_t addr_len, addrinfo_len;
  char remote_ip[IPVX_ADDR_ASCII_LEN];

  if (ftl->socket_family == AF_INET) {
     addrinfo = (struct sockaddr *)&ipv4_addrinfo;
     addrinfo_len = sizeof(struct sockaddr_in);
  }
  else {
     addrinfo = (struct sockaddr *)&ipv6_addrinfo;
     addrinfo_len = sizeof(struct sockaddr_in6);
  }

#ifdef _WIN32
  if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL)) {
    FTL_LOG(ftl, FTL_LOG_WARN, "Failed to set recv_thread priority to THREAD_PRIORITY_TIME_CRITICAL\n");
  }
#endif

  if ((buf = (unsigned char*)malloc(MAX_PACKET_BUFFER)) == NULL) {
    FTL_LOG(ftl, FTL_LOG_ERROR, "Failed to allocate recv buffer\n");
    return (OS_THREAD_TYPE)-1;
  }

  while (ftl_get_state(ftl, FTL_RX_THRD)) {

    // Wait on the socket for data or a timeout. The timeout is how we
    // exit the thread when disconnecting.
    ret = poll_socket_for_receive(media->media_socket, 50);
    if (ret == 0)
    {
      // This is a timeout, this is perfectly fine.
      continue;
    }
    else if (ret < 0)
    {
      // We hit an error.
      FTL_LOG(ftl, FTL_LOG_INFO, "Receive thread socket error on poll");
      continue;
    }

    // We have data on the socket, read it.
    addr_len = addrinfo_len;
    ret = recvfrom(media->media_socket, buf, MAX_PACKET_BUFFER, 0, (struct sockaddr *)addrinfo, &addr_len);
    if (ret <= 0) {
      // This shouldn't be possible, we should only be here is poll above told us there was data.
      FTL_LOG(ftl, FTL_LOG_INFO, "recv from failed with %s\n", get_socket_error());
      continue;
    }

    if (_get_remote_ip(addrinfo, addr_len, remote_ip, sizeof(remote_ip)) < 0) {
      continue;
    }

    if (strcmp(remote_ip, ftl->ingest_ip) != 0)
    {
      FTL_LOG(ftl, FTL_LOG_WARN, "Discarded packet from unexpected ip: %s\n", remote_ip);
      continue;
    }

    int version, padding, feedbackType, ptype, length, ssrcSender, ssrcMedia;
    uint16_t snBase, blp, sn;
    int recv_len = ret;

    if (recv_len < 2) {
      FTL_LOG(ftl, FTL_LOG_WARN, "recv packet too small to parse, discarding\n");
      continue;
    }

    /*extract rtp header*/
    version = (buf[0] >> 6) & 0x3;
    padding = (buf[0] >> 5) & 0x1;
    feedbackType = buf[0] & 0x1F;
    ptype = buf[1];

    if (feedbackType == 1 && ptype == 205) {

      length = ntohs(*((uint16_t*)(buf + 2)));

      if (recv_len < ((length + 1) * 4)) {
        FTL_LOG(ftl, FTL_LOG_WARN, "reported len was %d but packet is only %d...discarding\n", recv_len, ((length + 1) * 4));
        continue;
      }

      ssrcSender = ntohl(*((uint32_t*)(buf + 4)));
      ssrcMedia = ntohl(*((uint32_t*)(buf + 8)));

      uint16_t *p = (uint16_t *)(buf + 12);

      int fci;
      for (fci = 0; fci < (length - 2); fci++) {
        //request the first sequence number
        snBase = ntohs(*p++);
        _nack_resend_packet(ftl, ssrcMedia, snBase);
        blp = ntohs(*p++);
        if (blp) {
          int i;
          for (i = 0; i < 16; i++) {
            if ((blp & (1 << i)) != 0) {
              sn = snBase + i + 1;
              _nack_resend_packet(ftl, ssrcMedia, sn);
            }
          }
        }
      }
    }
    else if (feedbackType == 1 && ptype == PING_PTYPE) {

      ping_pkt_t *ping = (ping_pkt_t *)buf;

      struct timeval now;
      int delay_ms;
      media_stats_t *pkt_stats = &ftl->video.media_component.stats;

      gettimeofday(&now, NULL);
      delay_ms = (int)timeval_subtract_to_ms(&now, &ping->xmit_time);

      if (delay_ms > pkt_stats->pkt_rtt_max) {
        pkt_stats->pkt_rtt_max = delay_ms;
      }
      else if (delay_ms < pkt_stats->pkt_rtt_min) {
        pkt_stats->pkt_rtt_min = delay_ms;
      }

      pkt_stats->total_rtt += delay_ms;
      pkt_stats->rtt_samples++;

      ftl->media.last_rtt_delay = delay_ms;
    }
  }

  free(buf);

  FTL_LOG(ftl, FTL_LOG_INFO, "Exited Recv Thread\n");

  return (OS_THREAD_TYPE)0;
}