in c/driver/postgresql/statement.cc [358:490]
AdbcStatusCode PostgresStatement::CreateBulkTable(const std::string& current_schema,
const struct ArrowSchema& source_schema,
std::string* escaped_table,
std::string* escaped_field_list,
struct AdbcError* error) {
PGconn* conn = connection_->conn();
if (!ingest_.db_schema.empty() && ingest_.temporary) {
SetError(error, "[libpq] Cannot set both %s and %s",
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA, ADBC_INGEST_OPTION_TEMPORARY);
return ADBC_STATUS_INVALID_STATE;
}
{
if (!ingest_.db_schema.empty()) {
char* escaped =
PQescapeIdentifier(conn, ingest_.db_schema.c_str(), ingest_.db_schema.size());
if (escaped == nullptr) {
SetError(error, "[libpq] Failed to escape target schema %s for ingestion: %s",
ingest_.db_schema.c_str(), PQerrorMessage(conn));
return ADBC_STATUS_INTERNAL;
}
*escaped_table += escaped;
*escaped_table += " . ";
PQfreemem(escaped);
} else if (ingest_.temporary) {
// OK to be redundant (CREATE TEMPORARY TABLE pg_temp.foo)
*escaped_table += "pg_temp . ";
} else {
// Explicitly specify the current schema to avoid any temporary tables
// shadowing this table
char* escaped =
PQescapeIdentifier(conn, current_schema.c_str(), current_schema.size());
*escaped_table += escaped;
*escaped_table += " . ";
PQfreemem(escaped);
}
if (!ingest_.target.empty()) {
char* escaped =
PQescapeIdentifier(conn, ingest_.target.c_str(), ingest_.target.size());
if (escaped == nullptr) {
SetError(error, "[libpq] Failed to escape target table %s for ingestion: %s",
ingest_.target.c_str(), PQerrorMessage(conn));
return ADBC_STATUS_INTERNAL;
}
*escaped_table += escaped;
PQfreemem(escaped);
}
}
std::string create;
if (ingest_.temporary) {
create = "CREATE TEMPORARY TABLE ";
} else {
create = "CREATE TABLE ";
}
switch (ingest_.mode) {
case IngestMode::kCreate:
case IngestMode::kAppend:
// Nothing to do
break;
case IngestMode::kReplace: {
std::string drop = "DROP TABLE IF EXISTS " + *escaped_table;
PGresult* result = PQexecParams(conn, drop.c_str(), /*nParams=*/0,
/*paramTypes=*/nullptr, /*paramValues=*/nullptr,
/*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
/*resultFormat=*/1 /*(binary)*/);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
AdbcStatusCode code =
SetError(error, result, "[libpq] Failed to drop table: %s\nQuery was: %s",
PQerrorMessage(conn), drop.c_str());
PQclear(result);
return code;
}
PQclear(result);
break;
}
case IngestMode::kCreateAppend:
create += "IF NOT EXISTS ";
break;
}
create += *escaped_table;
create += " (";
for (int64_t i = 0; i < source_schema.n_children; i++) {
if (i > 0) {
create += ", ";
*escaped_field_list += ", ";
}
const char* unescaped = source_schema.children[i]->name;
char* escaped = PQescapeIdentifier(conn, unescaped, std::strlen(unescaped));
if (escaped == nullptr) {
SetError(error, "[libpq] Failed to escape column %s for ingestion: %s", unescaped,
PQerrorMessage(conn));
return ADBC_STATUS_INTERNAL;
}
create += escaped;
*escaped_field_list += escaped;
PQfreemem(escaped);
PostgresType pg_type;
struct ArrowError na_error;
CHECK_NA_DETAIL(INTERNAL,
PostgresType::FromSchema(*type_resolver_, source_schema.children[i],
&pg_type, &na_error),
&na_error, error);
create += " " + pg_type.sql_type_name();
}
if (ingest_.mode == IngestMode::kAppend) {
return ADBC_STATUS_OK;
}
create += ")";
SetError(error, "%s%s", "[libpq] ", create.c_str());
PGresult* result = PQexecParams(conn, create.c_str(), /*nParams=*/0,
/*paramTypes=*/nullptr, /*paramValues=*/nullptr,
/*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
/*resultFormat=*/1 /*(binary)*/);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
AdbcStatusCode code =
SetError(error, result, "[libpq] Failed to create table: %s\nQuery was: %s",
PQerrorMessage(conn), create.c_str());
PQclear(result);
return code;
}
PQclear(result);
return ADBC_STATUS_OK;
}