in c/driver/postgresql/statement.cc [783:853]
AdbcStatusCode PostgresStatement::CreateBulkTable(
const struct ArrowSchema& source_schema,
const std::vector<struct ArrowSchemaView>& source_schema_fields,
struct AdbcError* error) {
std::string create = "CREATE TABLE ";
create += ingest_.target;
create += " (";
for (size_t i = 0; i < source_schema_fields.size(); i++) {
if (i > 0) create += ", ";
create += source_schema.children[i]->name;
switch (source_schema_fields[i].type) {
case ArrowType::NANOARROW_TYPE_INT8:
case ArrowType::NANOARROW_TYPE_INT16:
create += " SMALLINT";
break;
case ArrowType::NANOARROW_TYPE_INT32:
create += " INTEGER";
break;
case ArrowType::NANOARROW_TYPE_INT64:
create += " BIGINT";
break;
case ArrowType::NANOARROW_TYPE_FLOAT:
create += " REAL";
break;
case ArrowType::NANOARROW_TYPE_DOUBLE:
create += " DOUBLE PRECISION";
break;
case ArrowType::NANOARROW_TYPE_STRING:
create += " TEXT";
break;
case ArrowType::NANOARROW_TYPE_BINARY:
create += " BYTEA";
break;
case ArrowType::NANOARROW_TYPE_DATE32:
create += " DATE";
break;
case ArrowType::NANOARROW_TYPE_TIMESTAMP:
if (strcmp("", source_schema_fields[i].timezone)) {
create += " TIMESTAMPTZ";
} else {
create += " TIMESTAMP";
}
break;
case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
create += " INTERVAL";
break;
default:
SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
static_cast<uint64_t>(i + 1), " ('", source_schema.children[i]->name,
"') has unsupported type for ingestion ",
ArrowTypeString(source_schema_fields[i].type));
return ADBC_STATUS_NOT_IMPLEMENTED;
}
}
create += ")";
SetError(error, "%s%s", "[libpq] ", create.c_str());
PGresult* result = PQexecParams(connection_->conn(), create.c_str(), /*nParams=*/0,
/*paramTypes=*/nullptr, /*paramValues=*/nullptr,
/*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
/*resultFormat=*/1 /*(binary)*/);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error, "[libpq] Failed to create table: %s\nQuery was: %s",
PQerrorMessage(connection_->conn()), create.c_str());
PQclear(result);
return ADBC_STATUS_IO;
}
PQclear(result);
return ADBC_STATUS_OK;
}