AdbcStatusCode PostgresStatement::CreateBulkTable()

in c/driver/postgresql/statement.cc [358:490]


AdbcStatusCode PostgresStatement::CreateBulkTable(const std::string& current_schema,
                                                  const struct ArrowSchema& source_schema,
                                                  std::string* escaped_table,
                                                  std::string* escaped_field_list,
                                                  struct AdbcError* error) {
  PGconn* conn = connection_->conn();

  if (!ingest_.db_schema.empty() && ingest_.temporary) {
    SetError(error, "[libpq] Cannot set both %s and %s",
             ADBC_INGEST_OPTION_TARGET_DB_SCHEMA, ADBC_INGEST_OPTION_TEMPORARY);
    return ADBC_STATUS_INVALID_STATE;
  }

  {
    if (!ingest_.db_schema.empty()) {
      char* escaped =
          PQescapeIdentifier(conn, ingest_.db_schema.c_str(), ingest_.db_schema.size());
      if (escaped == nullptr) {
        SetError(error, "[libpq] Failed to escape target schema %s for ingestion: %s",
                 ingest_.db_schema.c_str(), PQerrorMessage(conn));
        return ADBC_STATUS_INTERNAL;
      }
      *escaped_table += escaped;
      *escaped_table += " . ";
      PQfreemem(escaped);
    } else if (ingest_.temporary) {
      // OK to be redundant (CREATE TEMPORARY TABLE pg_temp.foo)
      *escaped_table += "pg_temp . ";
    } else {
      // Explicitly specify the current schema to avoid any temporary tables
      // shadowing this table
      char* escaped =
          PQescapeIdentifier(conn, current_schema.c_str(), current_schema.size());
      *escaped_table += escaped;
      *escaped_table += " . ";
      PQfreemem(escaped);
    }

    if (!ingest_.target.empty()) {
      char* escaped =
          PQescapeIdentifier(conn, ingest_.target.c_str(), ingest_.target.size());
      if (escaped == nullptr) {
        SetError(error, "[libpq] Failed to escape target table %s for ingestion: %s",
                 ingest_.target.c_str(), PQerrorMessage(conn));
        return ADBC_STATUS_INTERNAL;
      }
      *escaped_table += escaped;
      PQfreemem(escaped);
    }
  }

  std::string create;

  if (ingest_.temporary) {
    create = "CREATE TEMPORARY TABLE ";
  } else {
    create = "CREATE TABLE ";
  }

  switch (ingest_.mode) {
    case IngestMode::kCreate:
    case IngestMode::kAppend:
      // Nothing to do
      break;
    case IngestMode::kReplace: {
      std::string drop = "DROP TABLE IF EXISTS " + *escaped_table;
      PGresult* result = PQexecParams(conn, drop.c_str(), /*nParams=*/0,
                                      /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
                                      /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
                                      /*resultFormat=*/1 /*(binary)*/);
      if (PQresultStatus(result) != PGRES_COMMAND_OK) {
        AdbcStatusCode code =
            SetError(error, result, "[libpq] Failed to drop table: %s\nQuery was: %s",
                     PQerrorMessage(conn), drop.c_str());
        PQclear(result);
        return code;
      }
      PQclear(result);
      break;
    }
    case IngestMode::kCreateAppend:
      create += "IF NOT EXISTS ";
      break;
  }
  create += *escaped_table;
  create += " (";

  for (int64_t i = 0; i < source_schema.n_children; i++) {
    if (i > 0) {
      create += ", ";
      *escaped_field_list += ", ";
    }

    const char* unescaped = source_schema.children[i]->name;
    char* escaped = PQescapeIdentifier(conn, unescaped, std::strlen(unescaped));
    if (escaped == nullptr) {
      SetError(error, "[libpq] Failed to escape column %s for ingestion: %s", unescaped,
               PQerrorMessage(conn));
      return ADBC_STATUS_INTERNAL;
    }
    create += escaped;
    *escaped_field_list += escaped;
    PQfreemem(escaped);

    PostgresType pg_type;
    struct ArrowError na_error;
    CHECK_NA_DETAIL(INTERNAL,
                    PostgresType::FromSchema(*type_resolver_, source_schema.children[i],
                                             &pg_type, &na_error),
                    &na_error, error);
    create += " " + pg_type.sql_type_name();
  }

  if (ingest_.mode == IngestMode::kAppend) {
    return ADBC_STATUS_OK;
  }

  create += ")";
  SetError(error, "%s%s", "[libpq] ", create.c_str());
  PGresult* result = PQexecParams(conn, create.c_str(), /*nParams=*/0,
                                  /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
                                  /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
                                  /*resultFormat=*/1 /*(binary)*/);
  if (PQresultStatus(result) != PGRES_COMMAND_OK) {
    AdbcStatusCode code =
        SetError(error, result, "[libpq] Failed to create table: %s\nQuery was: %s",
                 PQerrorMessage(conn), create.c_str());
    PQclear(result);
    return code;
  }
  PQclear(result);
  return ADBC_STATUS_OK;
}