in holo-client-c/src/worker.c [868:948]
ActionStatus handle_gets(ConnectionHolder* connHolder, HoloTableSchema* schema, dlist_head* gets, int nRecords) {
SqlCache* sqlCache = connection_holder_get_or_create_get_sql_cache(connHolder, schema, nRecords);
int nParams = nRecords * schema->nPrimaryKeys;
char** params = MALLOC(nParams, char*);
dlist_iter iter;
GetItem* getItem;
int count = -1;
dlist_foreach(iter, gets){
getItem = dlist_container(GetItem, list_node, iter.cur);
for (int i = 0;i < schema->nPrimaryKeys;i++){
int col = schema->primaryKeys[i];
params[++count] = getItem->get->record->values[col];
sqlCache->paramLengths[count] = getItem->get->record->valueLengths[col];
sqlCache->paramFormats[count] = getItem->get->record->valueFormats[col];
}
}
PGresult* res = NULL;
res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, nParams, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, NULL);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
LOG_ERROR("Get from table \"%s\" as batch failed.", schema->tableName->tableName);
if (res != NULL) {
LOG_ERROR("Error msg: %s", PQresultErrorMessage(res));
PQclear(res);
}
FREE(params);
return FAILURE_NOT_NEED_RETRY;
}
metrics_meter_mark(connHolder->metrics->rps, nRecords);
metrics_meter_mark(connHolder->metrics->qps, 1);
LPMap* map = connHolder->map;
int nTuples = PQntuples(res);
int M = nTuples > 0 ? nTuples * 2 : 1;
for (int i = 0; i < nTuples; i++) {
res_tuple_to_map(res, i, schema, map, M);
}
dlist_foreach(iter, gets){
int resNum = -1;
getItem = dlist_container(GetItem, list_node, iter.cur);
int index = record_pk_hash_code(getItem->get->record, M);
for (int i = 0; i < M; i++){
if(map->values[index] == NULL) break;
bool same = true;
int temp = ((intptr_t)map->values[index]) - 1;
for (int j = 0; j < schema->nPrimaryKeys; j++) {
int col = schema->primaryKeys[j];
if (strcmp(getItem->get->record->values[col], PQgetvalue(res, temp, col)) != 0) {
same = false;
break;
}
}
if (same) {
resNum = temp;
break;
}
index = (index + 1) % M;
}
if (resNum == -1) {
complete_future(getItem->get->future, NULL);
} else {
HoloRecord* resRecord = holo_client_new_record(schema);
for (int n = 0; n < schema->nColumns; n++) {
char* value = PQgetvalue(res, resNum, n);
int len = strlen(value);
char* ptr = (char*)new_record_val(resRecord, len + 1);
deep_copy_string_to(value, ptr, len + 1);
set_record_val(resRecord, n, ptr, 0, len + 1);
}
complete_future(getItem->get->future, resRecord);
}
}
if (res != NULL) PQclear(res);
FREE(params);
holo_client_clear_lp_map(map);
return SUCCESS;
}