in libftl/ingest.c [95:202]
ftl_status_t ftl_find_closest_available_ingest(const char* ingestHosts[], int ingestsCount, char* bestIngestHostComputed)
{
if (ingestHosts == NULL || ingestsCount <= 0) {
return FTL_UNKNOWN_ERROR_CODE;
}
ftl_ingest_t* ingestElements = NULL;
OS_THREAD_HANDLE *handles = NULL;
_tmp_ingest_thread_data_t *data = NULL;
int i;
ftl_status_t ret_status = FTL_SUCCESS;
do {
if ((ingestElements = calloc(ingestsCount, sizeof(ftl_ingest_t))) == NULL) {
ret_status = FTL_MALLOC_FAILURE;
break;
}
for (i = 0; i < ingestsCount; i++) {
size_t host_len = strlen(ingestHosts[i]) + 1;
if ((ingestElements[i].name = malloc(host_len)) == NULL) {
ret_status = FTL_MALLOC_FAILURE;
break;
}
strcpy_s(ingestElements[i].name, host_len, ingestHosts[i]);
ingestElements[i].rtt = 1000;
ingestElements[i].next = NULL;
}
if (ret_status != FTL_SUCCESS) {
break;
}
if ((handles = (OS_THREAD_HANDLE *)malloc(sizeof(OS_THREAD_HANDLE) * ingestsCount)) == NULL) {
ret_status = FTL_MALLOC_FAILURE;
break;
}
if ((data = (_tmp_ingest_thread_data_t *)malloc(sizeof(_tmp_ingest_thread_data_t) * ingestsCount)) == NULL) {
ret_status = FTL_MALLOC_FAILURE;
break;
}
} while (0);
// malloc failed, cleanup
if (ret_status != FTL_SUCCESS) {
if (ingestElements != NULL) {
for (i = 0; i < ingestsCount; i++) {
free(ingestElements[i].name);
}
}
free(ingestElements);
free(handles);
free(data);
return ret_status;
}
ftl_ingest_t *best = NULL;
struct timeval start, stop, delta;
gettimeofday(&start, NULL);
/*query all the ingests about cpu and rtt*/
for (i = 0; i < ingestsCount; i++) {
handles[i] = 0;
data[i].ingest = &ingestElements[i];
data[i].ftl = NULL;
os_create_thread(&handles[i], NULL, _ingest_get_rtt, &data[i]);
sleep_ms(5); //space out the pings
}
/*wait for all the ingests to complete*/
for (i = 0; i < ingestsCount; i++) {
if (handles[i] != 0) {
os_wait_thread(handles[i]);
}
if (best == NULL || ingestElements[i].rtt < best->rtt) {
best = &ingestElements[i];
}
}
gettimeofday(&stop, NULL);
timeval_subtract(&delta, &stop, &start);
int ms = (int)timeval_to_ms(&delta);
for (i = 0; i < ingestsCount; i++) {
if (handles[i] != 0) {
os_destroy_thread(handles[i]);
}
}
free(handles);
free(data);
if (best) {
strcpy_s(bestIngestHostComputed, strlen(best->name), best->name);
} else {
ret_status = FTL_UNKNOWN_ERROR_CODE;
}
for (i = 0; i < ingestsCount; i++) {
free(ingestElements[i].name);
}
free(ingestElements);
return ret_status;
}