ActionStatus exec_batch()

in holo-client-c/src/worker.c [596:760]


ActionStatus exec_batch(ConnectionHolder* connHolder, Batch* batch, dlist_node** current, int nRecords, char** errMsgAddr){
    if (batch->nRecords <= 0){
        LOG_WARN("Nothing to insert.");
        return FAILURE_NOT_NEED_RETRY;
    }
    if (batch->nRecords == 1){
        LOG_DEBUG("Single record in batch.");
        return exec_batch_one_by_one(connHolder, batch, current, nRecords, errMsgAddr);
    }

    if (nRecords == 0) nRecords = batch->nRecords;
    SqlCache* sqlCache = connection_holder_get_or_create_sql_cache_with_batch(connHolder, batch, nRecords);
    int nParams;
    char** params;
    dlist_mutable_iter miter;
    miter.cur = *current;
    RecordItem* recordItem;
    int64_t minSeq = INT64_MAX;
    int64_t maxSeq = INT64_MIN;
    if (batch->isSupportUnnest && nRecords > 1) {
        nParams = batch->nValues;
        params = MALLOC(nParams, char*);
        int cParam = 0;
        for (int i = 0; i < batch->schema->nColumns; i++) {
            if (!batch->valuesSet[i]) continue;
            // 对于每个set value的列,创建数组
            char** valueArray = MALLOC(nRecords, char*);
            int* lengthArray = MALLOC(nRecords, int);
            // 遍历recordList,取对应位置的value填充数组
            int cRecords = 0;
            dlist_foreach_from(miter, &(batch->recordList), *current) {
                if (cRecords >= nRecords) break;
                recordItem = dlist_container(RecordItem, list_node, miter.cur);
                lengthArray[cRecords] = recordItem->record->valueLengths[i];
                valueArray[cRecords++] = recordItem->record->values[i];
                minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq;
                maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq;
            }
            // 把这个数组变成一条record,插入param
            int length = 0;
            char* ptr = NULL;
            int convertMode = get_convert_mode_for_unnest(batch, i);

            switch (convertMode)
            {
            case 1:
                // binary转成binary写入
                length = 20 + 4 * nRecords;
                for (int j = 0; j < nRecords; j++) {
                    if (valueArray[j] == NULL) {
                        continue;
                    }
                    length += get_val_len_by_type_oid(batch->schema->columns[i].type);
                }
                ptr = MALLOC(length, char);
                unnest_convert_array_to_postgres_binary(ptr, valueArray, nRecords, get_val_len_by_type_oid(batch->schema->columns[i].type), batch->schema->columns[i].type);
                sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type);
                sqlCache->paramFormats[cParam] = 1;
                params[cParam] = ptr;
                sqlCache->paramLengths[cParam++] = length;
                FREE(valueArray);
                FREE(lengthArray);
                break;
            case 2:
                // text转成binary写入
                length = 20 + 4 * nRecords;
                for (int j = 0; j < nRecords; j++) {
                    if (valueArray[j] == NULL) {
                        continue;
                    }
                    length += strlen(valueArray[j]);
                }
                ptr = MALLOC(length, char);
                convert_text_array_to_postgres_binary(ptr, valueArray, nRecords, batch->schema->columns[i].type);
                sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type);
                sqlCache->paramFormats[cParam] = 1;
                params[cParam] = ptr;
                sqlCache->paramLengths[cParam++] = length;
                FREE(valueArray);
                FREE(lengthArray);
                break;
            case 3:
                // text转成text写入,参考pg jdbc的拼法
                length = unnest_convert_array_to_text(&ptr, valueArray, lengthArray, nRecords);
                sqlCache->paramTypes[cParam] = get_array_oid_by_type_oid(batch->schema->columns[i].type);
                sqlCache->paramFormats[cParam] = 0;
                params[cParam] = ptr;
                sqlCache->paramLengths[cParam++] = length;
                FREE(valueArray);
                FREE(lengthArray);
                break;
            default:
                LOG_ERROR("Unknown convertMode in unnest");
                break;
            }
        }
    } else {
        nParams = nRecords * batch->nValues;
        params = MALLOC(nParams, char*);
        int count = -1;
        int cRecords = 0;
        dlist_foreach_from(miter, &(batch->recordList), *current){
            if (cRecords >= nRecords) break;
            recordItem = dlist_container(RecordItem, list_node, miter.cur);
            for (int i = 0;i < batch->schema->nColumns;i++){
                if (!batch->valuesSet[i]) continue;
                params[++count] = recordItem->record->values[i];
                sqlCache->paramLengths[count] = recordItem->record->valueLengths[i];
            }
            minSeq = recordItem->record->sequence < minSeq ? recordItem->record->sequence : minSeq;
            maxSeq = recordItem->record->sequence > maxSeq ? recordItem->record->sequence : maxSeq;
            cRecords++;
        }
    }

    PGresult* res = NULL;
    res = connection_holder_exec_params_with_retry(connHolder, sqlCache->command, nParams, sqlCache->paramTypes, (const char* const*)params, sqlCache->paramLengths, sqlCache->paramFormats, 0, errMsgAddr);
    if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK){
        LOG_ERROR("Mutate into table \"%s\" as batch failed.", batch->schema->tableName->tableName);

        ActionStatus rc = SUCCESS;
        if (batch->isSupportUnnest && nRecords > 1 && nRecords * batch->nValues <= MAX_PARAM_NUM) {
            LOG_WARN("Retrying once without unnest...");
            batch->isSupportUnnest = false;
            rc = exec_batch(connHolder, batch, current, nRecords, errMsgAddr);
            if (res != NULL) PQclear(res);
            for (int i = 0; i < nParams; i++) {
                FREE(params[i]);
            }
            FREE(params);
            return rc;
        }
        if (res != NULL && is_dirty_data_error(get_errcode_from_pg_res(res))) {
            LOG_WARN("Retrying one by one...");
            //脏数据类型的异常,需要拆成一条一条重试
            rc = exec_batch_one_by_one(connHolder, batch, current, nRecords, errMsgAddr);
        } else {
            //对于不one by one的情况,也要回调做异常处理,但是不提供record,只给出errMsg。用户可以通过record指针是否为NULL自行设计逻辑
            connHolder->handleExceptionByUser(NULL, PQresultErrorMessage(res), connHolder->exceptionHandlerParam);
            rc = FAILURE_NOT_NEED_RETRY;
        }
        if (res != NULL) PQclear(res);
        if (batch->isSupportUnnest && nRecords > 1) {
            for (int i = 0; i < nParams; i++) {
                FREE(params[i]);
            }
        }
        FREE(params);
        return rc;
    }
    LOG_DEBUG("Mutate into table \"%s\" as batch succeed, minSeq:%"PRId64", maxSeq:%"PRId64".", batch->schema->tableName->tableName, minSeq, maxSeq);

    metrics_meter_mark(connHolder->metrics->rps, nRecords);
    metrics_meter_mark(connHolder->metrics->qps, 1);
    if (res != NULL) PQclear(res);
    if (batch->isSupportUnnest && nRecords > 1) {
        for (int i = 0; i < nParams; i++) {
            FREE(params[i]);
        }
    }
    FREE(params);
    *current = miter.cur;

    return SUCCESS;
}