ftl_status_t ftl_find_closest_available_ingest()

in libftl/ingest.c [95:202]


ftl_status_t ftl_find_closest_available_ingest(const char* ingestHosts[], int ingestsCount, char* bestIngestHostComputed)
{
    if (ingestHosts == NULL || ingestsCount <= 0) {
      return FTL_UNKNOWN_ERROR_CODE;
    }

    ftl_ingest_t* ingestElements = NULL;
    OS_THREAD_HANDLE *handles = NULL;
    _tmp_ingest_thread_data_t *data = NULL;
    
    int i;

    ftl_status_t ret_status = FTL_SUCCESS;
    do {
        if ((ingestElements = calloc(ingestsCount, sizeof(ftl_ingest_t))) == NULL) {
            ret_status = FTL_MALLOC_FAILURE;
            break;
        }

        for (i = 0; i < ingestsCount; i++) {
            size_t host_len = strlen(ingestHosts[i]) + 1;
            if ((ingestElements[i].name = malloc(host_len)) == NULL) {
                ret_status = FTL_MALLOC_FAILURE;
                break;
            }
            strcpy_s(ingestElements[i].name, host_len, ingestHosts[i]);
            ingestElements[i].rtt = 1000;
            ingestElements[i].next = NULL;
        }
        if (ret_status != FTL_SUCCESS) {
            break;
        }

        if ((handles = (OS_THREAD_HANDLE *)malloc(sizeof(OS_THREAD_HANDLE) * ingestsCount)) == NULL) {
            ret_status = FTL_MALLOC_FAILURE;
            break;
        }

        if ((data = (_tmp_ingest_thread_data_t *)malloc(sizeof(_tmp_ingest_thread_data_t) * ingestsCount)) == NULL) {
            ret_status = FTL_MALLOC_FAILURE;
            break;
        }
    } while (0);

    // malloc failed, cleanup
    if (ret_status != FTL_SUCCESS) {
        if (ingestElements != NULL) {
            for (i = 0; i < ingestsCount; i++) {
              free(ingestElements[i].name);
            }
        }
        free(ingestElements);
        free(handles);
        free(data);
        return ret_status;
    }

    ftl_ingest_t *best = NULL;
    struct timeval start, stop, delta;
    gettimeofday(&start, NULL);

    /*query all the ingests about cpu and rtt*/
    for (i = 0; i < ingestsCount; i++) {
        handles[i] = 0;
        data[i].ingest = &ingestElements[i];
        data[i].ftl = NULL;
        os_create_thread(&handles[i], NULL, _ingest_get_rtt, &data[i]);
        sleep_ms(5); //space out the pings
    }

    /*wait for all the ingests to complete*/
    for (i = 0; i < ingestsCount; i++) {

        if (handles[i] != 0) {
            os_wait_thread(handles[i]);
        }

        if (best == NULL || ingestElements[i].rtt < best->rtt) {
            best = &ingestElements[i];
        }
    }

    gettimeofday(&stop, NULL);
    timeval_subtract(&delta, &stop, &start);
    int ms = (int)timeval_to_ms(&delta);

    for (i = 0; i < ingestsCount; i++) {
        if (handles[i] != 0) {
            os_destroy_thread(handles[i]);
        }
    }

    free(handles);
    free(data);

    if (best) {
        strcpy_s(bestIngestHostComputed, strlen(best->name), best->name);
    } else {
        ret_status = FTL_UNKNOWN_ERROR_CODE;
    }

    for (i = 0; i < ingestsCount; i++) {
        free(ingestElements[i].name);
    }
    free(ingestElements);

    return ret_status;
}