ActionStatus handle_gets()

in holo-client-c/src/worker.c [868:948]


ActionStatus handle_gets(ConnectionHolder* connHolder, HoloTableSchema* schema, dlist_head* gets, int nRecords) {
    SqlCache* sqlCache = connection_holder_get_or_create_get_sql_cache(connHolder, schema, nRecords);
    int nParams = nRecords * schema->nPrimaryKeys;
    char** params = MALLOC(nParams, char*);
    dlist_iter iter;
    GetItem* getItem;
    int count = -1;
    dlist_foreach(iter, gets){
        getItem = dlist_container(GetItem, list_node, iter.cur);
        for (int i = 0;i < schema->nPrimaryKeys;i++){
            int col = schema->primaryKeys[i];
            params[++count] = getItem->get->record->values[col];
            sqlCache->paramLengths[count] = getItem->get->record->valueLengths[col];
            sqlCache->paramFormats[count] = getItem->get->record->valueFormats[col];
        }
    }

    PGresult* res = NULL;
    res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, nParams, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, NULL);
    if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
        LOG_ERROR("Get from table \"%s\" as batch failed.", schema->tableName->tableName);
        if (res != NULL) {
            LOG_ERROR("Error msg: %s", PQresultErrorMessage(res));
            PQclear(res);
        }
        FREE(params);
        return FAILURE_NOT_NEED_RETRY;
    }
    metrics_meter_mark(connHolder->metrics->rps, nRecords);
    metrics_meter_mark(connHolder->metrics->qps, 1);

    LPMap* map = connHolder->map;
    int nTuples = PQntuples(res);
    int M = nTuples > 0 ? nTuples * 2 : 1;
    for (int i = 0; i < nTuples; i++) {
        res_tuple_to_map(res, i, schema, map, M);
    }
    dlist_foreach(iter, gets){
        int resNum = -1;
        getItem = dlist_container(GetItem, list_node, iter.cur);
        int index = record_pk_hash_code(getItem->get->record, M);
        
        for (int i = 0; i < M; i++){
            if(map->values[index] == NULL) break;
            bool same = true;
            int temp = ((intptr_t)map->values[index]) - 1;
            for (int j = 0; j < schema->nPrimaryKeys; j++) {
                int col = schema->primaryKeys[j];
                if (strcmp(getItem->get->record->values[col], PQgetvalue(res, temp, col)) != 0) {
                    same = false;
                    break;
                }
            }
            if (same) {
                resNum = temp;
                break;
            }
            index = (index + 1) % M;
        }

        if (resNum == -1) {
            complete_future(getItem->get->future, NULL);
        } else {
            HoloRecord* resRecord = holo_client_new_record(schema);
            for (int n = 0; n < schema->nColumns; n++) {
                char* value = PQgetvalue(res, resNum, n);
                int len = strlen(value);
                char* ptr = (char*)new_record_val(resRecord, len + 1);
                deep_copy_string_to(value, ptr, len + 1);
                set_record_val(resRecord, n, ptr, 0, len + 1);
            }
            complete_future(getItem->get->future, resRecord);
        }
    }

    if (res != NULL) PQclear(res);
    FREE(params);
    holo_client_clear_lp_map(map);

    return SUCCESS;
}