holo-client-c/src/get_collector.c (167 lines of code) (raw):
#include "get_collector.h"
#include "utils.h"
#include "action.h"
#include "logger_private.h"
TableGetCollector* holo_client_new_table_get_collector(HoloTableSchema* schema, HoloWorkerPool* pool, int batchSize, pthread_cond_t* signal) {
TableGetCollector* collector = MALLOC(1, TableGetCollector);
collector->schema = schema;
collector->pool = pool;
collector->numRequests = 0;
collector->batchSize = batchSize;
collector->mutex = MALLOC(1, pthread_mutex_t);
pthread_mutex_init(collector->mutex, NULL);
collector->requests = MALLOC(batchSize, HoloGet);
collector->signal = signal;
return collector;
}
void holo_client_destroy_table_get_collector(TableGetCollector* collector) {
flush_table_get_collector(collector);
pthread_mutex_destroy(collector->mutex);
FREE(collector->mutex);
FREE(collector->requests);
FREE(collector);
collector = NULL;
}
GetAction* do_flush_table_get_collector(TableGetCollector* collector) {
GetAction* action = NULL;
if (collector->numRequests == 0) return NULL;
action = holo_client_new_get_action();
for (int i = 0; i < collector->numRequests; i++) {
get_action_add_request(action, collector->requests[i]);
collector->requests[i] = NULL;
}
collector->numRequests = 0;
action->schema = collector->schema;
// LOG_DEBUG("num request: %d", action->numRequests);
return action;
}
void flush_table_get_collector(TableGetCollector* collector) {
GetAction* action = NULL;
if (collector->numRequests == 0) return;
pthread_mutex_lock(collector->mutex);
action = do_flush_table_get_collector(collector);
pthread_mutex_unlock(collector->mutex);
if (action != NULL) holo_client_submit_action_to_worker_pool(collector->pool, (Action*)action);
}
void table_get_collector_add_request(TableGetCollector* collector, HoloGet get) {
GetAction* action = NULL;
pthread_mutex_lock(collector->mutex);
collector->requests[collector->numRequests] = get;
collector->numRequests++;
get->submitted = true;
if (collector->numRequests == collector->batchSize) {
action = do_flush_table_get_collector(collector);
} else {
pthread_cond_signal(collector->signal);
}
pthread_mutex_unlock(collector->mutex);
if (action != NULL) holo_client_submit_action_to_worker_pool(collector->pool, (Action*)action);
}
GetCollector* holo_client_new_get_collector(HoloWorkerPool* pool, int batchSize) {
GetCollector* collector = MALLOC(1, GetCollector);
dlist_init(&(collector->tableCollectors));
collector->numTables = 0;
collector->actionWatcherThread = MALLOC(1, pthread_t);
collector->mutex = MALLOC(1, pthread_mutex_t);
collector->cond = MALLOC(1, pthread_cond_t);
pthread_mutex_init(collector->mutex, NULL);
pthread_cond_init(collector->cond, NULL);
collector->status = 0;
collector->pool = pool;
collector->batchSize = batchSize;
return collector;
}
void holo_client_destroy_get_collector(GetCollector* collector) {
TableGetCollectorItem* item;
TableGetCollector* tableCollector = NULL;
dlist_mutable_iter miter;
dlist_foreach_modify(miter, &(collector->tableCollectors)) {
item = dlist_container(TableGetCollectorItem, list_node, miter.cur);
tableCollector = item->tableGetCollector;
dlist_delete(miter.cur);
holo_client_destroy_table_get_collector(tableCollector);
FREE(item);
}
pthread_mutex_destroy(collector->mutex);
pthread_cond_destroy(collector->cond);
FREE(collector->actionWatcherThread);
FREE(collector->mutex);
FREE(collector->cond);
FREE(collector);
collector = NULL;
}
void holo_client_do_flush_get_collector(GetCollector* collector) {
dlist_iter iter;
TableGetCollector* tableCollector = NULL;
dlist_foreach(iter, &(collector->tableCollectors)) {
tableCollector = dlist_container(TableGetCollectorItem, list_node, iter.cur)->tableGetCollector;
flush_table_get_collector(tableCollector);
}
}
void* watch_get_collector_run(void* argsPtr) {
GetCollector* collector = argsPtr;
struct timespec out_time;
pthread_mutex_lock(collector->mutex);
while (collector->status == 1) {
holo_client_do_flush_get_collector(collector);
out_time = get_out_time(2000);
pthread_cond_timedwait(collector->cond, collector->mutex, &out_time);
}
collector->status = 3;
pthread_mutex_unlock(collector->mutex);
return NULL;
}
int holo_client_start_watch_get_collector(GetCollector* collector) {
int rc;
collector->status = 1;
rc = pthread_create(collector->actionWatcherThread, NULL, watch_get_collector_run, collector);
if (rc != 0) {
collector->status = 4;
LOG_ERROR("start get collector failed with error code %d", rc);
}
LOG_DEBUG("start watch get collector");
return rc;
}
int holo_client_stop_watch_get_collector(GetCollector* collector) {
int rc;
collector->status = 2;
pthread_cond_signal(collector->cond);
rc = pthread_join(*collector->actionWatcherThread, NULL);
collector->status = 3;
return rc;
}
TableGetCollectorItem* create_table_get_collector_item(TableGetCollector* tableCollector) {
TableGetCollectorItem* item = MALLOC(1, TableGetCollectorItem);
item->tableGetCollector = tableCollector;
return item;
}
TableGetCollector* find_table_get_collector(GetCollector* collector, HoloTableSchema* schema) {
TableGetCollector* tableCollector = NULL;
dlist_iter iter;
TableGetCollectorItem* item;
dlist_foreach(iter, &(collector->tableCollectors)) {
item = dlist_container(TableGetCollectorItem, list_node, iter.cur);
if (item->tableGetCollector->schema->tableId == schema->tableId) {
tableCollector = item->tableGetCollector;
break;
}
}
return tableCollector;
}
TableGetCollector* find_or_create_table_get_collector(GetCollector* collector, HoloTableSchema* schema) {
TableGetCollector* tableCollector = NULL;
tableCollector = find_table_get_collector(collector, schema);
if (tableCollector != NULL) return tableCollector;
pthread_mutex_lock(collector->mutex);
tableCollector = find_table_get_collector(collector, schema);
if (tableCollector == NULL) {
//create new table get collector
tableCollector = holo_client_new_table_get_collector(schema, collector->pool, collector->batchSize, collector->cond);
dlist_push_head(&(collector->tableCollectors), &(create_table_get_collector_item(tableCollector)->list_node));
collector->numTables++;
}
pthread_mutex_unlock(collector->mutex);
return tableCollector;
}
void holo_client_add_request_to_get_collector(GetCollector* collector, HoloGet get) {
TableGetCollector* tableCollector = find_or_create_table_get_collector(collector, get->record->schema);
table_get_collector_add_request(tableCollector, get);
}