holo-client-c/src/worker.c (865 lines of code) (raw):

#include "worker.h" #include "logger_private.h" #include <libpq-fe.h> #include "utils.h" #include "action.h" #include <stdlib.h> #include "table_schema_private.h" #include "batch.h" #include "sql_builder.h" #include "exception.h" #include "inttypes.h" #define MAX_PARAM_NUM 32767 // max number of parameters in PG's prepared statement typedef ActionStatus (*BatchHandler)(ConnectionHolder*, Batch*, dlist_node**, int, char**); ActionStatus get_holo_version(ConnectionHolder*); ActionStatus get_table_schema(ConnectionHolder*, HoloTableSchema*, HoloTableName, char**); ActionStatus handle_mutations(ConnectionHolder*, dlist_head*, char**); void add_mutation_item_to_batch_list(MutationItem*, dlist_head*); ActionStatus handle_batch_list(dlist_head*, ConnectionHolder*, char**); ActionStatus handle_batch(Batch*, int, BatchHandler, ConnectionHolder*, char**); ActionStatus exec_batch(ConnectionHolder*, Batch*, dlist_node**, int, char**); ActionStatus exec_batch_one_by_one(ConnectionHolder*, Batch*, dlist_node**, int, char**); ActionStatus delete_batch(ConnectionHolder*, Batch*, dlist_node**, int); ActionStatus handle_sql(ConnectionHolder* connHolder, SqlFunction sqlFunction, void* arg, void** retAddr); ActionStatus handle_gets(ConnectionHolder*, HoloTableSchema*, dlist_head*, int); extern int unnest_convert_array_to_text(char**, char**, int*, int); extern void unnest_convert_array_to_postgres_binary(char*, void*, int, int, int); extern void convert_text_array_to_postgres_binary(char*, char**, int, int); Worker* holo_client_new_worker(HoloConfig config, int index, bool isFixedFe) { Worker* worker = MALLOC(1, Worker); worker->connHolder = holo_client_new_connection_holder(config, isFixedFe); worker->action = NULL; worker->config = config; if (isFixedFe) { worker->config.connInfo = generate_fixed_fe_conn_info(config.connInfo); } else { worker->config.connInfo = deep_copy_string(config.connInfo); } worker->index = index; worker->status = 0; worker->thread = MALLOC(1, pthread_t); worker->mutex = MALLOC(1, pthread_mutex_t); worker->cond = MALLOC(1, pthread_cond_t); pthread_mutex_init(worker->mutex, NULL); pthread_cond_init(worker->cond, NULL); worker->metrics = holo_client_new_metrics_in_worker(); worker->connHolder->metrics = worker->metrics; worker->lastUpdateTime = current_time_ms(); worker->idleMutex = NULL; worker->idleCond = NULL; worker->map = holo_client_new_lp_map(config.readBatchSize); worker->connHolder->map = worker->map; return worker; } void* worker_run(void* workerPtr) { Worker* worker = workerPtr; pthread_mutex_lock(worker->mutex); while(worker->status == 1) { if(worker->action != NULL) { metrics_histogram_update(worker->metrics->idleTime, current_time_ms() - worker->lastUpdateTime); worker->lastUpdateTime = current_time_ms(); ActionStatus rc; switch(worker->action->type){ case 0: rc = connection_holder_do_action(worker->connHolder, worker->action, handle_meta_action); break; case 1: rc = connection_holder_do_action(worker->connHolder, worker->action, handle_mutation_action); break; case 2: rc = connection_holder_do_action(worker->connHolder, worker->action, handle_sql_action); break; case 3: rc = connection_holder_do_action(worker->connHolder, worker->action, handle_get_action); break; default: rc = FAILURE_NOT_NEED_RETRY; } if (rc != SUCCESS) worker_abort_action(worker); metrics_histogram_update(worker->metrics->handleActionTime, current_time_ms() - worker->lastUpdateTime); worker->lastUpdateTime = current_time_ms(); worker->action = NULL; } pthread_cond_signal(worker->idleCond); struct timespec out_time = get_out_time(worker->config.connectionMaxIdleMs); if (pthread_cond_timedwait(worker->cond, worker->mutex, &out_time) != 0){ //空闲超时,关闭连接 LOG_INFO("Worker %d idle time out.", worker->index); connection_holder_close_conn(worker->connHolder); pthread_cond_signal(worker->idleCond); pthread_cond_wait(worker->cond, worker->mutex); }; } worker->status = 3; connection_holder_close_conn(worker->connHolder); pthread_mutex_unlock(worker->mutex); return NULL; } int holo_client_start_worker(Worker* worker) { int rc; worker->status = 1; metrics_meter_reset(worker->metrics->qps); metrics_meter_reset(worker->metrics->rps); worker->lastUpdateTime = current_time_ms(); rc = pthread_create(worker->thread, NULL, worker_run, worker); if (rc != 0) { worker->status = 4; LOG_ERROR("Worker %d started failed with error code %d.", worker->index, rc); } LOG_DEBUG("Worker %d started.", worker->index); return rc; } int holo_client_stop_worker(Worker* worker) { int rc; pthread_mutex_lock(worker->mutex); worker->status = 2; pthread_cond_signal(worker->cond); pthread_mutex_unlock(worker->mutex); rc = pthread_join(*worker->thread, NULL); LOG_DEBUG("Worker %d stopped.", worker->index); worker->status = 3; return rc; } void holo_client_close_worker(Worker* worker) { //释放资源 pthread_mutex_destroy(worker->mutex); pthread_cond_destroy(worker->cond); FREE(worker->connHolder->holoVersion); FREE(worker->connHolder->connInfo); FREE(worker->connHolder); FREE(worker->config.connInfo); FREE(worker->thread); FREE(worker->mutex); FREE(worker->cond); holo_client_destroy_metrics_in_worker(worker->metrics); holo_client_destroy_lp_map(worker->map); FREE(worker); worker = NULL; } bool holo_client_try_submit_action_to_worker(Worker* worker, Action* action) { bool success = false; if(pthread_mutex_trylock(worker->mutex) == 0) { if(worker->status == 1 && worker->action == NULL) { worker->action = action; success = true; pthread_cond_signal(worker->cond); } pthread_mutex_unlock(worker->mutex); } return success; } ActionStatus handle_meta_action(ConnectionHolder* connHolder, Action* action) { ActionStatus rc = get_holo_version(connHolder); // 若获取holo版本失败,仍然尝试获取table schema HoloTableSchema* schema = holo_client_new_tableschema(); rc = get_table_schema(connHolder, schema, ((MetaAction*)action)->meta->tableName, &(((MetaAction*)action)->meta->future->errMsg)); if (rc != SUCCESS) return rc; complete_future(((MetaAction*)action)->meta->future, schema); holo_client_destroy_meta_action((MetaAction*)action); return rc; } ActionStatus handle_mutation_action(ConnectionHolder* connHolder, Action* action) { ActionStatus rc = handle_mutations(connHolder, &((MutationAction*)action)->requests, &(((MutationAction*)action)->future->errMsg)); if (rc != SUCCESS) return rc; complete_future(((MutationAction*)action)->future, action); return rc; } ActionStatus handle_sql_action(ConnectionHolder* connHolder, Action* action) { void* retVal = NULL; ActionStatus rc = handle_sql(connHolder, ((SqlAction*)action)->sql->sqlFunction, ((SqlAction*)action)->sql->arg, &retVal); if (rc != SUCCESS) return rc; complete_future(((SqlAction*)action)->sql->future, retVal); holo_client_destroy_sql_action((SqlAction*)action); return rc; } ActionStatus handle_get_action(ConnectionHolder* connHolder, Action* action) { // LOG_DEBUG("num get requests: %d", ((GetAction*)action)->numRequests); ActionStatus rc = handle_gets(connHolder, ((GetAction*)action)->schema, &((GetAction*)action)->requests, ((GetAction*)action)->numRequests); if (rc != SUCCESS) return rc; holo_client_destroy_get_action((GetAction*)action); return rc; } void worker_abort_action(Worker* worker){ switch(worker->action->type){ case 0: complete_future(((MetaAction*)worker->action)->meta->future, NULL); holo_client_destroy_meta_action((MetaAction*)worker->action); break; case 1: complete_future(((MutationAction*)worker->action)->future, NULL); break; case 2: complete_future(((SqlAction*)worker->action)->sql->future, NULL); holo_client_destroy_sql_action((SqlAction*)worker->action); break; case 3: abort_get_action((GetAction*)worker->action); holo_client_destroy_get_action((GetAction*)worker->action); break; default: LOG_ERROR("Worker abort action falied. Invalid action type: %d", worker->action->type); break; } } ActionStatus get_holo_version(ConnectionHolder* connHolder) { PGresult* res = NULL; const char* findHgVersion = "select hg_version()"; const char* findHoloVersion = "select version()"; res = connection_holder_exec_params_with_retry(connHolder, findHgVersion, 0, NULL, NULL, NULL, NULL, 0, NULL); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){ LOG_WARN("Get Hg Version failed."); if (res != NULL) { PQclear(res); } return FAILURE_NOT_NEED_RETRY; } else { char* hgVersionStr = deep_copy_string(PQgetvalue(res, 0, 0)); int cnt = sscanf(hgVersionStr, "Hologres %d.%d.%d", &connHolder->holoVersion->majorVersion, &connHolder->holoVersion->minorVersion, &connHolder->holoVersion->fixVersion); FREE(hgVersionStr); if (res != NULL) { PQclear(res); } if (cnt > 0) { LOG_DEBUG("Get Hg Version: %d.%d.%d", connHolder->holoVersion->majorVersion, connHolder->holoVersion->minorVersion, connHolder->holoVersion->fixVersion); return SUCCESS; } else { LOG_WARN("Get Hg Version failed."); return FAILURE_NOT_NEED_RETRY; } } res = connection_holder_exec_params_with_retry(connHolder, findHoloVersion, 0, NULL, NULL, NULL, NULL, 0, NULL); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){ LOG_WARN("Get Holo Version failed."); if (res != NULL) { PQclear(res); } return FAILURE_NOT_NEED_RETRY; } else { char* holoVersionStr = deep_copy_string(PQgetvalue(res, 0, 0)); int cnt = sscanf(holoVersionStr, "release-%d.%d.%d", &connHolder->holoVersion->majorVersion, &connHolder->holoVersion->minorVersion, &connHolder->holoVersion->fixVersion); FREE(holoVersionStr); if (res != NULL) { PQclear(res); } if (cnt > 0) { LOG_DEBUG("Get Holo Version: %d.%d.%d", connHolder->holoVersion->majorVersion, connHolder->holoVersion->minorVersion, connHolder->holoVersion->fixVersion); return SUCCESS; } else { LOG_WARN("Get Holo Version failed."); return FAILURE_NOT_NEED_RETRY; } } } ActionStatus get_table_schema(ConnectionHolder* connHolder, HoloTableSchema* schema, HoloTableName tableName, char** errMsgAddr) { HoloColumn* columns = NULL; const char* findTableOidSql = "SELECT property_value FROM hologres.hg_table_properties WHERE table_namespace = $1 AND table_name = $2 AND property_key = 'table_id'"; const char* findColumnsSql = "WITH c AS (SELECT column_name, ordinal_position, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2), a AS (SELECT attname, atttypid from pg_catalog.pg_attribute WHERE attrelid = $3::regclass::oid) SELECT * FROM c LEFT JOIN a ON c.column_name = a.attname;"; const char* findPrimaryKeysSql = "SELECT c.column_name, cc.ordinal_position FROM information_schema.key_column_usage AS c LEFT JOIN information_schema.table_constraints AS t ON t.constraint_name = c.constraint_name AND c.table_schema = t.table_schema AND c.table_name = t.table_name LEFT JOIN information_schema.columns cc ON c.table_schema = cc.table_schema AND c.table_name = cc.table_name AND c.column_name = cc.column_name WHERE t.table_schema = $1 AND t.table_name = $2 AND t.constraint_type = 'PRIMARY KEY'"; const char* findDistributionKeysSql = "WITH d AS (SELECT table_namespace, table_name, unnest(string_to_array(property_value, ',')) as column_name from hologres.hg_table_properties WHERE table_namespace = $1 AND table_name = $2 AND property_key = 'distribution_key') SELECT c.column_name, c.ordinal_position FROM d LEFT JOIN information_schema.columns c ON d.table_namespace = c.table_schema AND d.table_name=c.table_name AND d.column_name = c.column_name"; const char* findPartitionColumnSql = "SELECT partattrs FROM pg_partitioned_table WHERE partrelid = $1::regclass::oid"; PGresult* res = NULL; int nTuples, i, pos = 0; char oid[11]; // use prepared statement, so there's no need to quote_literal_cstr() before use const char* name[1] = {tableName.fullName}; const char* names[2] = {tableName.schemaName, tableName.tableName}; const char* names3[3] = {tableName.schemaName, tableName.tableName, tableName.fullName}; //get table oid res = connection_holder_exec_params_with_retry(connHolder, findTableOidSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){ LOG_ERROR("Get table Oid of table %s failed.", tableName.fullName); holo_client_destroy_tableschema(schema); if (res != NULL) PQclear(res); return FAILURE_NOT_NEED_RETRY; } else { schema->tableId = atoi(PQgetvalue(res, 0, 0)); } if (res != NULL) PQclear(res); //get column_name, data_type_oid, is_nullable, default_value of each column sprintf(oid, "%d", schema->tableId); res = connection_holder_exec_params_with_retry(connHolder, findColumnsSql, 3, NULL, names3, NULL, NULL, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){ LOG_ERROR("Get column info of table %s failed.", tableName.fullName); if (res != NULL) PQclear(res); return FAILURE_NOT_NEED_RETRY; } else { nTuples = PQntuples(res); schema->nColumns = nTuples; columns = holo_client_new_columns(nTuples); for (i = 0; i < nTuples; i++) { pos = atoi(PQgetvalue(res, i, 1)) - 1; if (pos >= 0 && pos < nTuples) { columns[pos].name = deep_copy_string(PQgetvalue(res, i, 0)); columns[pos].quoted = quote_identifier(columns[pos].name); columns[pos].type = atoi(PQgetvalue(res, i, 5)); if (strcmp(PQgetvalue(res, i, 2), "YES") == 0) { columns[pos].nullable = true; } else { columns[pos].nullable = false; } columns[pos].isPrimaryKey = false; if (PQgetisnull(res, i, 3)) { columns[pos].defaultValue = NULL; } else { columns[pos].defaultValue = deep_copy_string(PQgetvalue(res, i, 3)); } } } schema->columns = columns; } if (res != NULL) PQclear(res); //find primary keys res = connection_holder_exec_params_with_retry(connHolder, findPrimaryKeysSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){ LOG_ERROR("Get primary keys info of table %s failed.", tableName.fullName); holo_client_destroy_tableschema(schema); if (res != NULL) PQclear(res); return FAILURE_NOT_NEED_RETRY; } else { nTuples = PQntuples(res); schema->nPrimaryKeys = nTuples; FREE(schema->primaryKeys); if (nTuples > 0){ schema->primaryKeys = MALLOC(nTuples, int); } for (i = 0; i < nTuples; i++) { pos = atoi(PQgetvalue(res, i, 1)) - 1; if (pos >= 0 && pos < schema->nColumns) { columns[pos].isPrimaryKey = true; schema->primaryKeys[i] = pos; } } } if (res != NULL) PQclear(res); //find distribution keys res = connection_holder_exec_params_with_retry(connHolder, findDistributionKeysSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){ LOG_ERROR("Get distribution keys info of table %s failed.", tableName.fullName); holo_client_destroy_tableschema(schema); if (res != NULL) PQclear(res); return FAILURE_NOT_NEED_RETRY; } else { nTuples = PQntuples(res); schema->nDistributionKeys = nTuples; if (nTuples > 0) { schema->distributionKeys = MALLOC(nTuples, int); } for (i = 0; i < nTuples; i++) { pos = atoi(PQgetvalue(res, i, 1)) - 1; if (pos >= 0 && pos < schema->nColumns) { schema->distributionKeys[i] = pos; } } } if (res != NULL) PQclear(res); //find partition column res = connection_holder_exec_params_with_retry(connHolder, findPartitionColumnSql, 1, NULL, name, NULL, NULL, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){ LOG_ERROR("Get partition column of table %s failed.", tableName.fullName); holo_client_destroy_tableschema(schema); if (res != NULL) PQclear(res); return FAILURE_NOT_NEED_RETRY; } else if (PQntuples(res) == 0) { schema->partitionColumn = -1; } else { schema->partitionColumn = atoi(PQgetvalue(res, 0, 0)) - 1; } if (res != NULL) PQclear(res); //deep copy table name schema->tableName->fullName = deep_copy_string(tableName.fullName); schema->tableName->schemaName = deep_copy_string(tableName.schemaName); schema->tableName->tableName = deep_copy_string(tableName.tableName); return SUCCESS; } ActionStatus handle_mutations(ConnectionHolder* connHolder, dlist_head* mutations, char** errMsgAddr){ if (dlist_is_empty(mutations)) return FAILURE_NOT_NEED_RETRY; dlist_head insertBatchList; dlist_init(&insertBatchList); dlist_head deleteBatchList; dlist_init(&deleteBatchList); dlist_mutable_iter miterMutation; MutationItem* mutationItem; dlist_foreach_modify(miterMutation, mutations) { mutationItem = dlist_container(MutationItem, list_node, miterMutation.cur); if (mutationItem->mutation->mode == DELETE){ add_mutation_item_to_batch_list(mutationItem, &deleteBatchList); } else { add_mutation_item_to_batch_list(mutationItem, &insertBatchList); } } ActionStatus rc = SUCCESS; rc = handle_batch_list(&deleteBatchList, connHolder, errMsgAddr) != SUCCESS ? FAILURE_NOT_NEED_RETRY : rc; rc = handle_batch_list(&insertBatchList, connHolder, errMsgAddr) != SUCCESS ? FAILURE_NOT_NEED_RETRY : rc; return rc; } void add_mutation_item_to_batch_list(MutationItem* mutationItem, dlist_head* batchList) { dlist_mutable_iter miterBatch; BatchItem* batchItem; bool applied = false; dlist_foreach_modify(miterBatch, batchList) { batchItem = dlist_container(BatchItem, list_node, miterBatch.cur); if (batch_try_apply_mutation_request(batchItem->batch, mutationItem->mutation)) { applied = true; break; } } if (!applied){ Batch* newBatch = holo_client_new_batch_with_mutation_request(mutationItem->mutation); dlist_push_tail(batchList, &(create_batch_item(newBatch)->list_node)); } } ActionStatus handle_batch_list(dlist_head* batchList, ConnectionHolder* connHolder, char** errMsgAddr) { if (dlist_is_empty(batchList)) { return SUCCESS; } dlist_mutable_iter miterBatch; BatchItem* batchItem; ActionStatus rc = SUCCESS; dlist_foreach_modify(miterBatch, batchList) { batchItem = dlist_container(BatchItem, list_node, miterBatch.cur); if (!is_batch_support_unnest(connHolder, batchItem->batch)) { int maxSize = get_max_pow(batchItem->batch->nRecords); // make sure that num of parameters in one batch is less than MAX_PARAM_NUM in PG while (maxSize * batchItem->batch->nValues > MAX_PARAM_NUM) { maxSize >>= 1; } rc = handle_batch(batchItem->batch, maxSize, exec_batch, connHolder, errMsgAddr); } else { batchItem->batch->isSupportUnnest = true; rc = handle_batch(batchItem->batch, batchItem->batch->nRecords, exec_batch, connHolder, errMsgAddr); } holo_client_destroy_batch(batchItem->batch); dlist_delete(miterBatch.cur); FREE(batchItem); } return rc; } ActionStatus handle_batch(Batch* batch, int maxSize ,BatchHandler do_handle_batch , ConnectionHolder* connHolder, char** errMsgAddr){ ActionStatus rc = SUCCESS; dlist_node* current = dlist_head_node(&batch->recordList); int remainRecords = batch->nRecords; while (remainRecords != 0){ int nRecords = maxSize; if (remainRecords < maxSize){ nRecords = 1; int tRecords = remainRecords; while ((tRecords >>= 1) != 0) nRecords <<= 1; remainRecords -= nRecords; } else remainRecords -= maxSize; ActionStatus t = do_handle_batch(connHolder, batch, &current, nRecords, errMsgAddr); if (t != SUCCESS) rc = FAILURE_NOT_NEED_RETRY; } return rc; } int get_val_len_by_type_oid(unsigned int typeOid) { switch (typeOid) { case HOLO_TYPE_INT4: case HOLO_TYPE_FLOAT4: return 4; case HOLO_TYPE_INT8: case HOLO_TYPE_FLOAT8: case HOLO_TYPE_TIMESTAMP: case HOLO_TYPE_TIMESTAMPTZ: return 8; case HOLO_TYPE_INT2: return 2; case HOLO_TYPE_BOOL: return 1; default: LOG_ERROR("Varlena type cannot get fixed value length."); break; } return -1; } unsigned int get_array_oid_by_type_oid(unsigned int typeOid) { switch (typeOid) { case HOLO_TYPE_INT4: return HOLO_TYPE_INT4_ARRAY; case HOLO_TYPE_INT8: return HOLO_TYPE_INT8_ARRAY; case HOLO_TYPE_INT2: return HOLO_TYPE_INT2_ARRAY; case HOLO_TYPE_BOOL: return HOLO_TYPE_BOOL_ARRAY; case HOLO_TYPE_FLOAT4: return HOLO_TYPE_FLOAT4_ARRAY; case HOLO_TYPE_FLOAT8: return HOLO_TYPE_FLOAT8_ARRAY; case HOLO_TYPE_TIMESTAMP: return HOLO_TYPE_TIMESTAMP_ARRAY; case HOLO_TYPE_TIMESTAMPTZ: return HOLO_TYPE_TIMESTAMPTZ_ARRAY; case HOLO_TYPE_CHAR: return HOLO_TYPE_CHAR_ARRAY; case HOLO_TYPE_VARCHAR: return HOLO_TYPE_VARCHAR_ARRAY; case HOLO_TYPE_TEXT: return HOLO_TYPE_TEXT_ARRAY; case HOLO_TYPE_BYTEA: return HOLO_TYPE_BYTEA_ARRAY; case HOLO_TYPE_JSON: return HOLO_TYPE_JSON_ARRAY; case HOLO_TYPE_JSONB: return HOLO_TYPE_JSONB_ARRAY; case HOLO_TYPE_DATE: return HOLO_TYPE_DATE_ARRAY; case HOLO_TYPE_NUMERIC: return HOLO_TYPE_NUMERIC_ARRAY; default: LOG_ERROR("Unsupported array type, origin type: %d.", typeOid); break; } return 0; } int get_convert_mode_for_unnest(Batch* batch, int colIdx) { int convertMode = 0; switch (batch->schema->columns[colIdx].type) { case HOLO_TYPE_TIMESTAMP: case HOLO_TYPE_TIMESTAMPTZ: if (batch->valueFormats[colIdx] == 1) { convertMode = 1; } else { convertMode = 3; } break; case HOLO_TYPE_INT4: case HOLO_TYPE_INT8: case HOLO_TYPE_INT2: case HOLO_TYPE_BOOL: case HOLO_TYPE_FLOAT4: case HOLO_TYPE_FLOAT8: convertMode = 1; break; case HOLO_TYPE_CHAR: case HOLO_TYPE_VARCHAR: case HOLO_TYPE_TEXT: case HOLO_TYPE_JSON: case HOLO_TYPE_JSONB: convertMode = 3; break; case HOLO_TYPE_BYTEA: case HOLO_TYPE_NUMERIC: case HOLO_TYPE_DATE: convertMode = 3; break; default: LOG_ERROR("Generate convertMode failed for unnest, type is %d.", batch->schema->columns[colIdx].type); break; } return convertMode; } ActionStatus exec_batch(ConnectionHolder* connHolder, Batch* batch, dlist_node** current, int nRecords, char** errMsgAddr){ if (batch->nRecords <= 0){ LOG_WARN("Nothing to insert."); return FAILURE_NOT_NEED_RETRY; } if (batch->nRecords == 1){ LOG_DEBUG("Single record in batch."); return exec_batch_one_by_one(connHolder, batch, current, nRecords, errMsgAddr); } if (nRecords == 0) nRecords = batch->nRecords; SqlCache* sqlCache = connection_holder_get_or_create_sql_cache_with_batch(connHolder, batch, nRecords); int nParams; char** params; dlist_mutable_iter miter; miter.cur = *current; RecordItem* recordItem; int64_t minSeq = INT64_MAX; int64_t maxSeq = INT64_MIN; if (batch->isSupportUnnest && nRecords > 1) { nParams = batch->nValues; params = MALLOC(nParams, char*); int cParam = 0; for (int i = 0; i < batch->schema->nColumns; i++) { if (!batch->valuesSet[i]) continue; // 对于每个set value的列,创建数组 char** valueArray = MALLOC(nRecords, char*); int* lengthArray = MALLOC(nRecords, int); // 遍历recordList,取对应位置的value填充数组 int cRecords = 0; dlist_foreach_from(miter, &(batch->recordList), *current) { if (cRecords >= nRecords) break; recordItem = dlist_container(RecordItem, list_node, miter.cur); lengthArray[cRecords] = recordItem->record->valueLengths[i]; valueArray[cRecords++] = recordItem->record->values[i]; minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq; maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq; } // 把这个数组变成一条record,插入param int length = 0; char* ptr = NULL; int convertMode = get_convert_mode_for_unnest(batch, i); switch (convertMode) { case 1: // binary转成binary写入 length = 20 + 4 * nRecords; for (int j = 0; j < nRecords; j++) { if (valueArray[j] == NULL) { continue; } length += get_val_len_by_type_oid(batch->schema->columns[i].type); } ptr = MALLOC(length, char); unnest_convert_array_to_postgres_binary(ptr, valueArray, nRecords, get_val_len_by_type_oid(batch->schema->columns[i].type), batch->schema->columns[i].type); sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type); sqlCache->paramFormats[cParam] = 1; params[cParam] = ptr; sqlCache->paramLengths[cParam++] = length; FREE(valueArray); FREE(lengthArray); break; case 2: // text转成binary写入 length = 20 + 4 * nRecords; for (int j = 0; j < nRecords; j++) { if (valueArray[j] == NULL) { continue; } length += strlen(valueArray[j]); } ptr = MALLOC(length, char); convert_text_array_to_postgres_binary(ptr, valueArray, nRecords, batch->schema->columns[i].type); sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type); sqlCache->paramFormats[cParam] = 1; params[cParam] = ptr; sqlCache->paramLengths[cParam++] = length; FREE(valueArray); FREE(lengthArray); break; case 3: // text转成text写入,参考pg jdbc的拼法 length = unnest_convert_array_to_text(&ptr, valueArray, lengthArray, nRecords); sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type); sqlCache->paramFormats[cParam] = 0; params[cParam] = ptr; sqlCache->paramLengths[cParam++] = length; FREE(valueArray); FREE(lengthArray); break; default: LOG_ERROR("Unknown convertMode in unnest"); break; } } } else { nParams = nRecords * batch->nValues; params = MALLOC(nParams, char*); int count = -1; int cRecords = 0; dlist_foreach_from(miter, &(batch->recordList), *current){ if (cRecords >= nRecords) break; recordItem = dlist_container(RecordItem, list_node, miter.cur); for (int i = 0;i < batch->schema->nColumns;i++){ if (!batch->valuesSet[i]) continue; params[++count] = recordItem->record->values[i]; sqlCache->paramLengths[count] = recordItem->record->valueLengths[i]; } minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq; maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq; cRecords++; } } PGresult* res = NULL; res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, nParams, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK){ LOG_ERROR("Mutate into table \"%s\" as batch failed.", batch->schema->tableName->tableName); ActionStatus rc = SUCCESS; if (batch->isSupportUnnest && nRecords > 1 && nRecords * batch->nValues <= MAX_PARAM_NUM) { LOG_WARN("Retrying once without unnest..."); batch->isSupportUnnest = false; rc = exec_batch(connHolder, batch, current, nRecords, errMsgAddr); if (res != NULL) PQclear(res); for (int i = 0; i < nParams; i++) { FREE(params[i]); } FREE(params); return rc; } if (res != NULL && is_dirty_data_error(get_errcode_from_pg_res(res))) { LOG_WARN("Retrying one by one..."); //脏数据类型的异常,需要拆成一条一条重试 rc = exec_batch_one_by_one(connHolder, batch, current, nRecords, errMsgAddr); } else { //对于不one by one的情况,也要回调做异常处理,但是不提供record,只给出errMsg。用户可以通过record指针是否为NULL自行设计逻辑 connHolder->handleExceptionByUser(NULL, PQresultErrorMessage(res), connHolder->exceptionHandlerParam); rc = FAILURE_NOT_NEED_RETRY; } if (res != NULL) PQclear(res); if (batch->isSupportUnnest && nRecords > 1) { for (int i = 0; i < nParams; i++) { FREE(params[i]); } } FREE(params); return rc; } LOG_DEBUG("Mutate into table \"%s\" as batch succeed, minSeq:%"PRId64", maxSeq:%"PRId64".", batch->schema->tableName->tableName, minSeq, maxSeq); metrics_meter_mark(connHolder->metrics->rps, nRecords); metrics_meter_mark(connHolder->metrics->qps, 1); if (res != NULL) PQclear(res); if (batch->isSupportUnnest && nRecords > 1) { for (int i = 0; i < nParams; i++) { FREE(params[i]); } } FREE(params); *current = miter.cur; return SUCCESS; } ActionStatus exec_batch_one_by_one(ConnectionHolder* connHolder, Batch* batch, dlist_node** current, int nRecords, char** errMsgAddr){ if (batch->nRecords <= 0){ LOG_WARN("Nothing to insert."); return FAILURE_NOT_NEED_RETRY; } if (nRecords == 0) nRecords = batch->nRecords; SqlCache* sqlCache = connection_holder_get_or_create_sql_cache_with_batch(connHolder, batch, 1); PGresult* res = NULL; char** params = MALLOC(batch->nValues, char*); dlist_mutable_iter miter; RecordItem* recordItem; ActionStatus rc = SUCCESS; int cRecords = 0; int64_t minSeq = INT64_MAX; int64_t maxSeq = INT64_MIN; dlist_foreach_from(miter, &(batch->recordList), *current){ if (cRecords >= nRecords) break; recordItem = dlist_container(RecordItem, list_node, miter.cur); int count = -1; for (int i = 0;i < batch->schema->nColumns;i++){ if (!batch->valuesSet[i]) continue; params[++count] = recordItem->record->values[i]; sqlCache->paramLengths[count] = recordItem->record->valueLengths[i]; } minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq; maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq; res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, batch->nValues, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, errMsgAddr); if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK){ LOG_ERROR("Mutate into table \"%s\" failed.", batch->schema->tableName->tableName); connHolder->handleExceptionByUser(recordItem->record, PQresultErrorMessage(res), connHolder->exceptionHandlerParam); rc = FAILURE_NOT_NEED_RETRY; } if (res != NULL) PQclear(res); cRecords++; } LOG_DEBUG("Mutate into table \"%s\" one by one succeed, minSeq:%"PRId64", maxSeq:%"PRId64".", batch->schema->tableName->tableName, minSeq, maxSeq); metrics_meter_mark(connHolder->metrics->rps, nRecords); metrics_meter_mark(connHolder->metrics->qps, 1); FREE(params); *current = miter.cur; return rc; } ActionStatus handle_sql(ConnectionHolder* connHolder, SqlFunction sqlFunction, void* arg, void** retAddr) { connection_holder_exec_func_with_retry(connHolder, sqlFunction, arg, retAddr); return SUCCESS; } int res_tuple_hashcode(PGresult* res, int n, HoloTableSchema* schema, int size) { unsigned raw = 0; bool first = true; for (int i = 0;i < schema->nPrimaryKeys;i++){ int index = schema->primaryKeys[i]; char* value = PQgetvalue(res, n, index); int length = strlen(value) + 1; if (first){ MurmurHash3_x86_32(value, length, 0xf7ca7fd2, &raw); first = false; } else{ unsigned t = 0; MurmurHash3_x86_32(value, length, 0xf7ca7fd2, &t); raw ^= t; } } int hash = raw % ((unsigned)65536); int base = 65536 / size; int remain = 65536 % size; int pivot = (base + 1) * remain; int index = 0; if (hash < pivot) index = hash / (base + 1); else index = (hash - pivot) / base + remain; return index; } bool res_tuple_equals(PGresult* res, int n1, int n2, HoloTableSchema* schema) { for (int i = 0;i < schema->nPrimaryKeys;i++){ int index = schema->primaryKeys[i]; char* v1 = PQgetvalue(res, n1, index); char* v2 = PQgetvalue(res, n2, index); if (strcmp(v1, v2) != 0) return false; } return true; } void res_tuple_to_map(PGresult* res, int n, HoloTableSchema* schema, LPMap* map, int maxSize) { int index = res_tuple_hashcode(res, n, schema, maxSize); int M = maxSize; for(int i = 0; i < M; i++) { if(map->values[index] == NULL) { map->values[index] = (void*)(long)(n+1); map->size++; return; } if(res_tuple_equals(res, n, ((intptr_t)map->values[index])-1, schema)) { return; } index = (index + 1) % M; } } ActionStatus handle_gets(ConnectionHolder* connHolder, HoloTableSchema* schema, dlist_head* gets, int nRecords) { SqlCache* sqlCache = connection_holder_get_or_create_get_sql_cache(connHolder, schema, nRecords); int nParams = nRecords * schema->nPrimaryKeys; char** params = MALLOC(nParams, char*); dlist_iter iter; GetItem* getItem; int count = -1; dlist_foreach(iter, gets){ getItem = dlist_container(GetItem, list_node, iter.cur); for (int i = 0;i < schema->nPrimaryKeys;i++){ int col = schema->primaryKeys[i]; params[++count] = getItem->get->record->values[col]; sqlCache->paramLengths[count] = getItem->get->record->valueLengths[col]; sqlCache->paramFormats[count] = getItem->get->record->valueFormats[col]; } } PGresult* res = NULL; res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, nParams, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, NULL); if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){ LOG_ERROR("Get from table \"%s\" as batch failed.", schema->tableName->tableName); if (res != NULL) { LOG_ERROR("Error msg: %s", PQresultErrorMessage(res)); PQclear(res); } FREE(params); return FAILURE_NOT_NEED_RETRY; } metrics_meter_mark(connHolder->metrics->rps, nRecords); metrics_meter_mark(connHolder->metrics->qps, 1); LPMap* map = connHolder->map; int nTuples = PQntuples(res); int M = nTuples > 0 ? nTuples * 2 : 1; for (int i = 0; i < nTuples; i++) { res_tuple_to_map(res, i, schema, map, M); } dlist_foreach(iter, gets){ int resNum = -1; getItem = dlist_container(GetItem, list_node, iter.cur); int index = record_pk_hash_code(getItem->get->record, M); for (int i = 0; i < M; i++){ if(map->values[index] == NULL) break; bool same = true; int temp = ((intptr_t)map->values[index]) - 1; for (int j = 0; j < schema->nPrimaryKeys; j++) { int col = schema->primaryKeys[j]; if (strcmp(getItem->get->record->values[col], PQgetvalue(res, temp, col)) != 0) { same = false; break; } } if (same) { resNum = temp; break; } index = (index + 1) % M; } if (resNum == -1) { complete_future(getItem->get->future, NULL); } else { HoloRecord* resRecord = holo_client_new_record(schema); for (int n = 0; n < schema->nColumns; n++) { char* value = PQgetvalue(res, resNum, n); int len = strlen(value); char* ptr = (char*)new_record_val(resRecord, len + 1); deep_copy_string_to(value, ptr, len + 1); set_record_val(resRecord, n, ptr, 0, len + 1); } complete_future(getItem->get->future, resRecord); } } if (res != NULL) PQclear(res); FREE(params); holo_client_clear_lp_map(map); return SUCCESS; }