in holo-client-c/src/worker.c [596:760]
ActionStatus exec_batch(ConnectionHolder* connHolder, Batch* batch, dlist_node** current, int nRecords, char** errMsgAddr){
if (batch->nRecords <= 0){
LOG_WARN("Nothing to insert.");
return FAILURE_NOT_NEED_RETRY;
}
if (batch->nRecords == 1){
LOG_DEBUG("Single record in batch.");
return exec_batch_one_by_one(connHolder, batch, current, nRecords, errMsgAddr);
}
if (nRecords == 0) nRecords = batch->nRecords;
SqlCache* sqlCache = connection_holder_get_or_create_sql_cache_with_batch(connHolder, batch, nRecords);
int nParams;
char** params;
dlist_mutable_iter miter;
miter.cur = *current;
RecordItem* recordItem;
int64_t minSeq = INT64_MAX;
int64_t maxSeq = INT64_MIN;
if (batch->isSupportUnnest && nRecords > 1) {
nParams = batch->nValues;
params = MALLOC(nParams, char*);
int cParam = 0;
for (int i = 0; i < batch->schema->nColumns; i++) {
if (!batch->valuesSet[i]) continue;
// 对于每个set value的列,创建数组
char** valueArray = MALLOC(nRecords, char*);
int* lengthArray = MALLOC(nRecords, int);
// 遍历recordList,取对应位置的value填充数组
int cRecords = 0;
dlist_foreach_from(miter, &(batch->recordList), *current) {
if (cRecords >= nRecords) break;
recordItem = dlist_container(RecordItem, list_node, miter.cur);
lengthArray[cRecords] = recordItem->record->valueLengths[i];
valueArray[cRecords++] = recordItem->record->values[i];
minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq;
maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq;
}
// 把这个数组变成一条record,插入param
int length = 0;
char* ptr = NULL;
int convertMode = get_convert_mode_for_unnest(batch, i);
switch (convertMode)
{
case 1:
// binary转成binary写入
length = 20 + 4 * nRecords;
for (int j = 0; j < nRecords; j++) {
if (valueArray[j] == NULL) {
continue;
}
length += get_val_len_by_type_oid(batch->schema->columns[i].type);
}
ptr = MALLOC(length, char);
unnest_convert_array_to_postgres_binary(ptr, valueArray, nRecords, get_val_len_by_type_oid(batch->schema->columns[i].type), batch->schema->columns[i].type);
sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type);
sqlCache->paramFormats[cParam] = 1;
params[cParam] = ptr;
sqlCache->paramLengths[cParam++] = length;
FREE(valueArray);
FREE(lengthArray);
break;
case 2:
// text转成binary写入
length = 20 + 4 * nRecords;
for (int j = 0; j < nRecords; j++) {
if (valueArray[j] == NULL) {
continue;
}
length += strlen(valueArray[j]);
}
ptr = MALLOC(length, char);
convert_text_array_to_postgres_binary(ptr, valueArray, nRecords, batch->schema->columns[i].type);
sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type);
sqlCache->paramFormats[cParam] = 1;
params[cParam] = ptr;
sqlCache->paramLengths[cParam++] = length;
FREE(valueArray);
FREE(lengthArray);
break;
case 3:
// text转成text写入,参考pg jdbc的拼法
length = unnest_convert_array_to_text(&ptr, valueArray, lengthArray, nRecords);
sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type);
sqlCache->paramFormats[cParam] = 0;
params[cParam] = ptr;
sqlCache->paramLengths[cParam++] = length;
FREE(valueArray);
FREE(lengthArray);
break;
default:
LOG_ERROR("Unknown convertMode in unnest");
break;
}
}
} else {
nParams = nRecords * batch->nValues;
params = MALLOC(nParams, char*);
int count = -1;
int cRecords = 0;
dlist_foreach_from(miter, &(batch->recordList), *current){
if (cRecords >= nRecords) break;
recordItem = dlist_container(RecordItem, list_node, miter.cur);
for (int i = 0;i < batch->schema->nColumns;i++){
if (!batch->valuesSet[i]) continue;
params[++count] = recordItem->record->values[i];
sqlCache->paramLengths[count] = recordItem->record->valueLengths[i];
}
minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq;
maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq;
cRecords++;
}
}
PGresult* res = NULL;
res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, nParams, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, errMsgAddr);
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK){
LOG_ERROR("Mutate into table \"%s\" as batch failed.", batch->schema->tableName->tableName);
ActionStatus rc = SUCCESS;
if (batch->isSupportUnnest && nRecords > 1 && nRecords * batch->nValues <= MAX_PARAM_NUM) {
LOG_WARN("Retrying once without unnest...");
batch->isSupportUnnest = false;
rc = exec_batch(connHolder, batch, current, nRecords, errMsgAddr);
if (res != NULL) PQclear(res);
for (int i = 0; i < nParams; i++) {
FREE(params[i]);
}
FREE(params);
return rc;
}
if (res != NULL && is_dirty_data_error(get_errcode_from_pg_res(res))) {
LOG_WARN("Retrying one by one...");
//脏数据类型的异常,需要拆成一条一条重试
rc = exec_batch_one_by_one(connHolder, batch, current, nRecords, errMsgAddr);
} else {
//对于不one by one的情况,也要回调做异常处理,但是不提供record,只给出errMsg。用户可以通过record指针是否为NULL自行设计逻辑
connHolder->handleExceptionByUser(NULL, PQresultErrorMessage(res), connHolder->exceptionHandlerParam);
rc = FAILURE_NOT_NEED_RETRY;
}
if (res != NULL) PQclear(res);
if (batch->isSupportUnnest && nRecords > 1) {
for (int i = 0; i < nParams; i++) {
FREE(params[i]);
}
}
FREE(params);
return rc;
}
LOG_DEBUG("Mutate into table \"%s\" as batch succeed, minSeq:%"PRId64", maxSeq:%"PRId64".", batch->schema->tableName->tableName, minSeq, maxSeq);
metrics_meter_mark(connHolder->metrics->rps, nRecords);
metrics_meter_mark(connHolder->metrics->qps, 1);
if (res != NULL) PQclear(res);
if (batch->isSupportUnnest && nRecords > 1) {
for (int i = 0; i < nParams; i++) {
FREE(params[i]);
}
}
FREE(params);
*current = miter.cur;
return SUCCESS;
}