AdbcStatusCode PostgresStatement::CreateBulkTable()

in c/driver/postgresql/statement.cc [783:853]


AdbcStatusCode PostgresStatement::CreateBulkTable(
    const struct ArrowSchema& source_schema,
    const std::vector<struct ArrowSchemaView>& source_schema_fields,
    struct AdbcError* error) {
  std::string create = "CREATE TABLE ";
  create += ingest_.target;
  create += " (";

  for (size_t i = 0; i < source_schema_fields.size(); i++) {
    if (i > 0) create += ", ";
    create += source_schema.children[i]->name;
    switch (source_schema_fields[i].type) {
      case ArrowType::NANOARROW_TYPE_INT8:
      case ArrowType::NANOARROW_TYPE_INT16:
        create += " SMALLINT";
        break;
      case ArrowType::NANOARROW_TYPE_INT32:
        create += " INTEGER";
        break;
      case ArrowType::NANOARROW_TYPE_INT64:
        create += " BIGINT";
        break;
      case ArrowType::NANOARROW_TYPE_FLOAT:
        create += " REAL";
        break;
      case ArrowType::NANOARROW_TYPE_DOUBLE:
        create += " DOUBLE PRECISION";
        break;
      case ArrowType::NANOARROW_TYPE_STRING:
        create += " TEXT";
        break;
      case ArrowType::NANOARROW_TYPE_BINARY:
        create += " BYTEA";
        break;
      case ArrowType::NANOARROW_TYPE_DATE32:
        create += " DATE";
        break;
      case ArrowType::NANOARROW_TYPE_TIMESTAMP:
        if (strcmp("", source_schema_fields[i].timezone)) {
          create += " TIMESTAMPTZ";
        } else {
          create += " TIMESTAMP";
        }
        break;
      case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
        create += " INTERVAL";
        break;
      default:
        SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
                 static_cast<uint64_t>(i + 1), " ('", source_schema.children[i]->name,
                 "') has unsupported type for ingestion ",
                 ArrowTypeString(source_schema_fields[i].type));
        return ADBC_STATUS_NOT_IMPLEMENTED;
    }
  }

  create += ")";
  SetError(error, "%s%s", "[libpq] ", create.c_str());
  PGresult* result = PQexecParams(connection_->conn(), create.c_str(), /*nParams=*/0,
                                  /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
                                  /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
                                  /*resultFormat=*/1 /*(binary)*/);
  if (PQresultStatus(result) != PGRES_COMMAND_OK) {
    SetError(error, "[libpq] Failed to create table: %s\nQuery was: %s",
             PQerrorMessage(connection_->conn()), create.c_str());
    PQclear(result);
    return ADBC_STATUS_IO;
  }
  PQclear(result);
  return ADBC_STATUS_OK;
}