in c/driver/postgresql/copy/reader.h [746:935]
static inline ArrowErrorCode MakeCopyFieldReader(
const PostgresType& pg_type, ArrowSchema* schema,
std::unique_ptr<PostgresCopyFieldReader>* out, ArrowError* error) {
ArrowSchemaView schema_view;
NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, nullptr));
switch (schema_view.type) {
case NANOARROW_TYPE_BOOL:
switch (pg_type.type_id()) {
case PostgresTypeId::kBool:
*out = std::make_unique<PostgresCopyBooleanFieldReader>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_INT16:
switch (pg_type.type_id()) {
case PostgresTypeId::kInt2:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int16_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_INT32:
switch (pg_type.type_id()) {
case PostgresTypeId::kInt4:
case PostgresTypeId::kOid:
case PostgresTypeId::kRegproc:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int32_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_INT64:
switch (pg_type.type_id()) {
case PostgresTypeId::kInt8:
case PostgresTypeId::kCash:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int64_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_FLOAT:
switch (pg_type.type_id()) {
case PostgresTypeId::kFloat4:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<uint32_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_DOUBLE:
switch (pg_type.type_id()) {
case PostgresTypeId::kFloat8:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<uint64_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_STRING:
switch (pg_type.type_id()) {
case PostgresTypeId::kChar:
case PostgresTypeId::kVarchar:
case PostgresTypeId::kText:
case PostgresTypeId::kBpchar:
case PostgresTypeId::kName:
case PostgresTypeId::kEnum:
case PostgresTypeId::kJson:
*out = std::make_unique<PostgresCopyBinaryFieldReader>();
return NANOARROW_OK;
case PostgresTypeId::kNumeric:
*out = std::make_unique<PostgresCopyNumericFieldReader>();
return NANOARROW_OK;
case PostgresTypeId::kJsonb:
*out = std::make_unique<PostgresCopyJsonbFieldReader>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_BINARY:
// No need to check pg_type here: we can return the bytes of any
// Postgres type as binary.
*out = std::make_unique<PostgresCopyBinaryFieldReader>();
return NANOARROW_OK;
case NANOARROW_TYPE_LIST:
switch (pg_type.type_id()) {
case PostgresTypeId::kArray: {
if (pg_type.n_children() != 1) {
ArrowErrorSet(
error, "Expected Postgres array type to have one child but found %ld",
static_cast<long>(pg_type.n_children())); // NOLINT(runtime/int)
return EINVAL;
}
auto array_reader = std::make_unique<PostgresCopyArrayFieldReader>();
array_reader->Init(pg_type);
std::unique_ptr<PostgresCopyFieldReader> child_reader;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldReader(
pg_type.child(0), schema->children[0], &child_reader, error));
array_reader->InitChild(std::move(child_reader));
*out = std::move(array_reader);
return NANOARROW_OK;
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_STRUCT:
switch (pg_type.type_id()) {
case PostgresTypeId::kRecord: {
if (pg_type.n_children() != schema->n_children) {
ArrowErrorSet(error,
"Can't convert Postgres record type with %ld chlidren to Arrow "
"struct type with %ld children",
static_cast<long>(pg_type.n_children()), // NOLINT(runtime/int)
static_cast<long>(schema->n_children)); // NOLINT(runtime/int)
return EINVAL;
}
auto record_reader = std::make_unique<PostgresCopyRecordFieldReader>();
record_reader->Init(pg_type);
for (int64_t i = 0; i < pg_type.n_children(); i++) {
std::unique_ptr<PostgresCopyFieldReader> child_reader;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldReader(
pg_type.child(i), schema->children[i], &child_reader, error));
record_reader->AppendChild(std::move(child_reader));
}
*out = std::move(record_reader);
return NANOARROW_OK;
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_DATE32: {
// 2000-01-01
constexpr int32_t kPostgresDateEpoch = 10957;
*out = std::make_unique<
PostgresCopyNetworkEndianFieldReader<int32_t, kPostgresDateEpoch>>();
return NANOARROW_OK;
}
case NANOARROW_TYPE_TIME64: {
switch (pg_type.type_id()) {
case PostgresTypeId::kTime:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int64_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
}
case NANOARROW_TYPE_TIMESTAMP:
switch (pg_type.type_id()) {
case PostgresTypeId::kTimestamp:
case PostgresTypeId::kTimestamptz: {
// 2000-01-01 00:00:00.000000 in microseconds
constexpr int64_t kPostgresTimestampEpoch = 946684800000000;
*out = std::make_unique<
PostgresCopyNetworkEndianFieldReader<int64_t, kPostgresTimestampEpoch>>();
return NANOARROW_OK;
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
switch (pg_type.type_id()) {
case PostgresTypeId::kInterval: {
*out = std::make_unique<PostgresCopyIntervalFieldReader>();
return NANOARROW_OK;
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
}