c/driver/postgresql/postgres_copy_reader.h (798 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include <algorithm> #include <cerrno> #include <cinttypes> #include <cstdint> #include <memory> #include <string> #include <utility> #include <vector> #include <nanoarrow/nanoarrow.hpp> #include "postgres_type.h" #include "postgres_util.h" #include "vendor/portable-snippets/safe-math.h" // R 3.6 / Windows builds on a very old toolchain that does not define ENODATA #if defined(_WIN32) && !defined(MSVC) && !defined(ENODATA) #define ENODATA 120 #endif namespace adbcpq { // "PGCOPY\n\377\r\n\0" static int8_t kPgCopyBinarySignature[] = {0x50, 0x47, 0x43, 0x4F, 0x50, 0x59, 0x0A, static_cast<int8_t>(0xFF), 0x0D, 0x0A, 0x00}; // Read a value from the buffer without checking the buffer size. Advances // the cursor of data and reduces its size by sizeof(T). template <typename T> inline T ReadUnsafe(ArrowBufferView* data) { T out; memcpy(&out, data->data.data, sizeof(T)); out = SwapNetworkToHost(out); data->data.as_uint8 += sizeof(T); data->size_bytes -= sizeof(T); return out; } // Define some explicit specializations for types that don't have a SwapNetworkToHost // overload. template <> inline int8_t ReadUnsafe(ArrowBufferView* data) { int8_t out = data->data.as_int8[0]; data->data.as_uint8 += sizeof(int8_t); data->size_bytes -= sizeof(int8_t); return out; } template <> inline int16_t ReadUnsafe(ArrowBufferView* data) { return static_cast<int16_t>(ReadUnsafe<uint16_t>(data)); } template <> inline int32_t ReadUnsafe(ArrowBufferView* data) { return static_cast<int32_t>(ReadUnsafe<uint32_t>(data)); } template <> inline int64_t ReadUnsafe(ArrowBufferView* data) { return static_cast<int64_t>(ReadUnsafe<uint64_t>(data)); } template <typename T> ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) { if (data->size_bytes < static_cast<int64_t>(sizeof(T))) { ArrowErrorSet(error, "Unexpected end of input (expected %d bytes but found %ld)", static_cast<int>(sizeof(T)), static_cast<long>(data->size_bytes)); // NOLINT(runtime/int) return EINVAL; } *out = ReadUnsafe<T>(data); return NANOARROW_OK; } class PostgresCopyFieldReader { public: PostgresCopyFieldReader() : validity_(nullptr), offsets_(nullptr), data_(nullptr) { memset(&schema_view_, 0, sizeof(ArrowSchemaView)); } virtual ~PostgresCopyFieldReader() {} void Init(const PostgresType& pg_type) { pg_type_ = pg_type; } const PostgresType& InputType() const { return pg_type_; } virtual ArrowErrorCode InitSchema(ArrowSchema* schema) { NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view_, schema, nullptr)); return NANOARROW_OK; } virtual ArrowErrorCode InitArray(ArrowArray* array) { // Cache some buffer pointers validity_ = ArrowArrayValidityBitmap(array); for (int32_t i = 0; i < 3; i++) { switch (schema_view_.layout.buffer_type[i]) { case NANOARROW_BUFFER_TYPE_DATA_OFFSET: if (schema_view_.layout.element_size_bits[i] == 32) { offsets_ = ArrowArrayBuffer(array, i); } break; case NANOARROW_BUFFER_TYPE_DATA: data_ = ArrowArrayBuffer(array, i); break; default: break; } } return NANOARROW_OK; } virtual ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) { return ENOTSUP; } virtual ArrowErrorCode FinishArray(ArrowArray* array, ArrowError* error) { return NANOARROW_OK; } protected: PostgresType pg_type_; ArrowSchemaView schema_view_; ArrowBitmap* validity_; ArrowBuffer* offsets_; ArrowBuffer* data_; std::vector<std::unique_ptr<PostgresCopyFieldReader>> children_; ArrowErrorCode AppendValid(ArrowArray* array) { if (validity_->buffer.data != nullptr) { NANOARROW_RETURN_NOT_OK(ArrowBitmapAppend(validity_, true, 1)); } array->length++; return NANOARROW_OK; } }; // Reader for a Postgres boolean (one byte -> bitmap) class PostgresCopyBooleanFieldReader : public PostgresCopyFieldReader { public: ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { if (field_size_bytes <= 0) { return ArrowArrayAppendNull(array, 1); } if (field_size_bytes != 1) { ArrowErrorSet(error, "Expected field with one byte but found field with %d bytes", static_cast<int>(field_size_bytes)); // NOLINT(runtime/int) return EINVAL; } int64_t bytes_required = _ArrowBytesForBits(array->length + 1); if (bytes_required > data_->size_bytes) { NANOARROW_RETURN_NOT_OK( ArrowBufferAppendFill(data_, 0, bytes_required - data_->size_bytes)); } if (ReadUnsafe<int8_t>(data)) { ArrowBitSet(data_->data, array->length); } else { ArrowBitClear(data_->data, array->length); } return AppendValid(array); } }; // Reader for Pg->Arrow conversions whose representations are identical minus // the bswap from network endian. This includes all integral and float types. template <typename T, T kOffset = 0> class PostgresCopyNetworkEndianFieldReader : public PostgresCopyFieldReader { public: ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { if (field_size_bytes <= 0) { return ArrowArrayAppendNull(array, 1); } if (field_size_bytes != static_cast<int32_t>(sizeof(T))) { ArrowErrorSet(error, "Expected field with %d bytes but found field with %d bytes", static_cast<int>(sizeof(T)), static_cast<int>(field_size_bytes)); // NOLINT(runtime/int) return EINVAL; } T value = kOffset + ReadUnsafe<T>(data); NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &value, sizeof(T))); return AppendValid(array); } }; // Reader for Intervals class PostgresCopyIntervalFieldReader : public PostgresCopyFieldReader { public: ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { if (field_size_bytes <= 0) { return ArrowArrayAppendNull(array, 1); } if (field_size_bytes != 16) { ArrowErrorSet(error, "Expected field with %d bytes but found field with %d bytes", 16, static_cast<int>(field_size_bytes)); // NOLINT(runtime/int) return EINVAL; } // postgres stores time as usec, arrow stores as ns const int64_t time_usec = ReadUnsafe<int64_t>(data); int64_t time; if (!psnip_safe_int64_mul(&time, time_usec, 1000)) { ArrowErrorSet(error, "[libpq] Interval with time value %" PRId64 " usec would overflow when converting to nanoseconds", time_usec); return EINVAL; } const int32_t days = ReadUnsafe<int32_t>(data); const int32_t months = ReadUnsafe<int32_t>(data); NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &months, sizeof(int32_t))); NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &days, sizeof(int32_t))); NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &time, sizeof(int64_t))); return AppendValid(array); } }; // // Converts COPY resulting from the Postgres NUMERIC type into a string. // Rewritten based on the Postgres implementation of NUMERIC cast to string in // src/backend/utils/adt/numeric.c : get_str_from_var() (Note that in the initial source, // DEC_DIGITS is always 4 and DBASE is always 10000). // // Briefly, the Postgres representation of "numeric" is an array of int16_t ("digits") // from most significant to least significant. Each "digit" is a value between 0000 and // 9999. There are weight + 1 digits before the decimal point and dscale digits after the // decimal point. Both of those values can be zero or negative. A "sign" component // encodes the positive or negativeness of the value and is also used to encode special // values (inf, -inf, and nan). class PostgresCopyNumericFieldReader : public PostgresCopyFieldReader { public: ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { // -1 for NULL if (field_size_bytes < 0) { return ArrowArrayAppendNull(array, 1); } // Read the input if (data->size_bytes < static_cast<int64_t>(4 * sizeof(int16_t))) { ArrowErrorSet(error, "Expected at least %d bytes of field data for numeric copy data but " "only %d bytes of input remain", static_cast<int>(4 * sizeof(int16_t)), static_cast<int>(data->size_bytes)); // NOLINT(runtime/int) return EINVAL; } int16_t ndigits = ReadUnsafe<int16_t>(data); int16_t weight = ReadUnsafe<int16_t>(data); uint16_t sign = ReadUnsafe<uint16_t>(data); uint16_t dscale = ReadUnsafe<uint16_t>(data); if (data->size_bytes < static_cast<int64_t>(ndigits * sizeof(int16_t))) { ArrowErrorSet(error, "Expected at least %d bytes of field data for numeric digits copy " "data but only %d bytes of input remain", static_cast<int>(ndigits * sizeof(int16_t)), static_cast<int>(data->size_bytes)); // NOLINT(runtime/int) return EINVAL; } digits_.clear(); for (int16_t i = 0; i < ndigits; i++) { digits_.push_back(ReadUnsafe<int16_t>(data)); } // Handle special values std::string special_value; switch (sign) { case kNumericNAN: special_value = std::string("nan"); break; case kNumericPinf: special_value = std::string("inf"); break; case kNumericNinf: special_value = std::string("-inf"); break; case kNumericPos: case kNumericNeg: special_value = std::string(""); break; default: ArrowErrorSet(error, "Unexpected value for sign read from Postgres numeric field: %d", static_cast<int>(sign)); return EINVAL; } if (!special_value.empty()) { NANOARROW_RETURN_NOT_OK( ArrowBufferAppend(data_, special_value.data(), special_value.size())); NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt32(offsets_, data_->size_bytes)); return AppendValid(array); } // Calculate string space requirement int64_t max_chars_required = std::max<int64_t>(1, (weight + 1) * kDecDigits); max_chars_required += dscale + kDecDigits + 2; NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(data_, max_chars_required)); char* out0 = reinterpret_cast<char*>(data_->data + data_->size_bytes); char* out = out0; // Build output string in-place, starting with the negative sign if (sign == kNumericNeg) { *out++ = '-'; } // ...then digits before the decimal point int d; int d1; int16_t dig; if (weight < 0) { d = weight + 1; *out++ = '0'; } else { for (d = 0; d <= weight; d++) { if (d < ndigits) { dig = digits_[d]; } else { dig = 0; } // To strip leading zeroes int append = (d > 0); for (const auto pow10 : {1000, 100, 10, 1}) { d1 = dig / pow10; dig -= d1 * pow10; append |= (d1 > 0); if (append) { *out++ = d1 + '0'; } } } } // ...then the decimal point + digits after it. This may write more digits // than specified by dscale so we need to keep track of how many we want to // keep here. int64_t actual_chars_required = out - out0; if (dscale > 0) { *out++ = '.'; actual_chars_required += dscale + 1; for (int i = 0; i < dscale; i++, d++, i += kDecDigits) { if (d >= 0 && d < ndigits) { dig = digits_[d]; } else { dig = 0; } for (const auto pow10 : {1000, 100, 10, 1}) { d1 = dig / pow10; dig -= d1 * pow10; *out++ = d1 + '0'; } } } // Update data buffer size and add offsets data_->size_bytes += actual_chars_required; NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt32(offsets_, data_->size_bytes)); return AppendValid(array); } private: std::vector<int16_t> digits_; // Number of decimal digits per Postgres digit static const int kDecDigits = 4; // The "base" of the Postgres representation (i.e., each "digit" is 0 to 9999) static const int kNBase = 10000; // Valid values for the sign component static const uint16_t kNumericPos = 0x0000; static const uint16_t kNumericNeg = 0x4000; static const uint16_t kNumericNAN = 0xC000; static const uint16_t kNumericPinf = 0xD000; static const uint16_t kNumericNinf = 0xF000; }; // Reader for Pg->Arrow conversions whose Arrow representation is simply the // bytes of the field representation. This can be used with binary and string // Arrow types and any Postgres type. class PostgresCopyBinaryFieldReader : public PostgresCopyFieldReader { public: ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { // -1 for NULL (0 would be empty string) if (field_size_bytes < 0) { return ArrowArrayAppendNull(array, 1); } if (field_size_bytes > data->size_bytes) { ArrowErrorSet(error, "Expected %d bytes of field data but got %d bytes of input", static_cast<int>(field_size_bytes), static_cast<int>(data->size_bytes)); // NOLINT(runtime/int) return EINVAL; } NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, data->data.data, field_size_bytes)); data->data.as_uint8 += field_size_bytes; data->size_bytes -= field_size_bytes; int32_t* offsets = reinterpret_cast<int32_t*>(offsets_->data); NANOARROW_RETURN_NOT_OK( ArrowBufferAppendInt32(offsets_, offsets[array->length] + field_size_bytes)); return AppendValid(array); } }; class PostgresCopyArrayFieldReader : public PostgresCopyFieldReader { public: void InitChild(std::unique_ptr<PostgresCopyFieldReader> child) { child_ = std::move(child); child_->Init(pg_type_.child(0)); } ArrowErrorCode InitSchema(ArrowSchema* schema) override { NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitSchema(schema)); NANOARROW_RETURN_NOT_OK(child_->InitSchema(schema->children[0])); return NANOARROW_OK; } ArrowErrorCode InitArray(ArrowArray* array) override { NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitArray(array)); NANOARROW_RETURN_NOT_OK(child_->InitArray(array->children[0])); return NANOARROW_OK; } ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { if (field_size_bytes <= 0) { return ArrowArrayAppendNull(array, 1); } // Keep the cursor where we start to parse the array so we can check // the number of bytes read against the field size when finished const uint8_t* data0 = data->data.as_uint8; int32_t n_dim; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &n_dim, error)); int32_t flags; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &flags, error)); uint32_t element_type_oid; NANOARROW_RETURN_NOT_OK(ReadChecked<uint32_t>(data, &element_type_oid, error)); // We could validate the OID here, but this is a poor fit for all cases // (e.g. testing) since the OID can be specific to each database if (n_dim < 0) { ArrowErrorSet(error, "Expected array n_dim > 0 but got %d", static_cast<int>(n_dim)); // NOLINT(runtime/int) return EINVAL; } // This is apparently allowed if (n_dim == 0) { NANOARROW_RETURN_NOT_OK(ArrowArrayFinishElement(array)); return NANOARROW_OK; } int64_t n_items = 1; for (int32_t i = 0; i < n_dim; i++) { int32_t dim_size; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &dim_size, error)); n_items *= dim_size; int32_t lower_bound; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &lower_bound, error)); if (lower_bound != 1) { ArrowErrorSet(error, "Array value with lower bound != 1 is not supported"); return EINVAL; } } for (int64_t i = 0; i < n_items; i++) { int32_t child_field_size_bytes; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &child_field_size_bytes, error)); NANOARROW_RETURN_NOT_OK( child_->Read(data, child_field_size_bytes, array->children[0], error)); } int64_t bytes_read = data->data.as_uint8 - data0; if (bytes_read != field_size_bytes) { ArrowErrorSet(error, "Expected to read %d bytes from array field but read %d bytes", static_cast<int>(field_size_bytes), static_cast<int>(bytes_read)); // NOLINT(runtime/int) return EINVAL; } NANOARROW_RETURN_NOT_OK(ArrowArrayFinishElement(array)); return NANOARROW_OK; } private: std::unique_ptr<PostgresCopyFieldReader> child_; }; class PostgresCopyRecordFieldReader : public PostgresCopyFieldReader { public: void AppendChild(std::unique_ptr<PostgresCopyFieldReader> child) { int64_t child_i = static_cast<int64_t>(children_.size()); children_.push_back(std::move(child)); children_[child_i]->Init(pg_type_.child(child_i)); } ArrowErrorCode InitSchema(ArrowSchema* schema) override { NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitSchema(schema)); for (int64_t i = 0; i < schema->n_children; i++) { NANOARROW_RETURN_NOT_OK(children_[i]->InitSchema(schema->children[i])); } return NANOARROW_OK; } ArrowErrorCode InitArray(ArrowArray* array) override { NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitArray(array)); for (int64_t i = 0; i < array->n_children; i++) { NANOARROW_RETURN_NOT_OK(children_[i]->InitArray(array->children[i])); } return NANOARROW_OK; } ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { if (field_size_bytes < 0) { return ArrowArrayAppendNull(array, 1); } // Keep the cursor where we start to parse the field so we can check // the number of bytes read against the field size when finished const uint8_t* data0 = data->data.as_uint8; int32_t n_fields; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &n_fields, error)); if (n_fields != array->n_children) { ArrowErrorSet(error, "Expected nested record type to have %ld fields but got %d", static_cast<long>(array->n_children), // NOLINT(runtime/int) static_cast<int>(n_fields)); // NOLINT(runtime/int) return EINVAL; } for (int32_t i = 0; i < n_fields; i++) { uint32_t child_oid; NANOARROW_RETURN_NOT_OK(ReadChecked<uint32_t>(data, &child_oid, error)); int32_t child_field_size_bytes; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &child_field_size_bytes, error)); int result = children_[i]->Read(data, child_field_size_bytes, array->children[i], error); // On overflow, pretend all previous children for this struct were never // appended to. This leaves array in a valid state in the specific case // where EOVERFLOW was returned so that a higher level caller can attempt // to try again after creating a new array. if (result == EOVERFLOW) { for (int16_t j = 0; j < i; j++) { array->children[j]->length--; } } if (result != NANOARROW_OK) { return result; } } // field size == -1 means don't check (e.g., for a top-level row tuple) int64_t bytes_read = data->data.as_uint8 - data0; if (field_size_bytes != -1 && bytes_read != field_size_bytes) { ArrowErrorSet(error, "Expected to read %d bytes from record field but read %d bytes", static_cast<int>(field_size_bytes), static_cast<int>(bytes_read)); // NOLINT(runtime/int) return EINVAL; } array->length++; return NANOARROW_OK; } private: std::vector<std::unique_ptr<PostgresCopyFieldReader>> children_; }; // Subtely different from a Record field item: field count is an int16_t // instead of an int32_t and each field is not prefixed by its OID. class PostgresCopyFieldTupleReader : public PostgresCopyFieldReader { public: void AppendChild(std::unique_ptr<PostgresCopyFieldReader> child) { int64_t child_i = static_cast<int64_t>(children_.size()); children_.push_back(std::move(child)); children_[child_i]->Init(pg_type_.child(child_i)); } ArrowErrorCode InitSchema(ArrowSchema* schema) override { NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitSchema(schema)); for (int64_t i = 0; i < schema->n_children; i++) { NANOARROW_RETURN_NOT_OK(children_[i]->InitSchema(schema->children[i])); } return NANOARROW_OK; } ArrowErrorCode InitArray(ArrowArray* array) override { NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitArray(array)); for (int64_t i = 0; i < array->n_children; i++) { NANOARROW_RETURN_NOT_OK(children_[i]->InitArray(array->children[i])); } return NANOARROW_OK; } ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, ArrowError* error) override { int16_t n_fields; NANOARROW_RETURN_NOT_OK(ReadChecked<int16_t>(data, &n_fields, error)); if (n_fields == -1) { return ENODATA; } else if (n_fields != array->n_children) { ArrowErrorSet(error, "Expected -1 for end-of-stream or number of fields in output array " "(%ld) but got %d", static_cast<long>(array->n_children), // NOLINT(runtime/int) static_cast<int>(n_fields)); // NOLINT(runtime/int) return EINVAL; } for (int16_t i = 0; i < n_fields; i++) { int32_t child_field_size_bytes; NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &child_field_size_bytes, error)); int result = children_[i]->Read(data, child_field_size_bytes, array->children[i], error); // On overflow, pretend all previous children for this struct were never // appended to. This leaves array in a valid state in the specific case // where EOVERFLOW was returned so that a higher level caller can attempt // to try again after creating a new array. if (result == EOVERFLOW) { for (int16_t j = 0; j < i; j++) { array->children[j]->length--; } } if (result != NANOARROW_OK) { return result; } } array->length++; return NANOARROW_OK; } private: std::vector<std::unique_ptr<PostgresCopyFieldReader>> children_; }; // Factory for a PostgresCopyFieldReader that instantiates the proper subclass // and gives a nice error for Postgres type -> Arrow type conversions that aren't // supported. static inline ArrowErrorCode ErrorCantConvert(ArrowError* error, const PostgresType& pg_type, const ArrowSchemaView& schema_view) { ArrowErrorSet(error, "Can't convert Postgres type '%s' to Arrow type '%s'", pg_type.typname().c_str(), ArrowTypeString(schema_view.type)); // NOLINT(runtime/int) return EINVAL; } static inline ArrowErrorCode MakeCopyFieldReader(const PostgresType& pg_type, ArrowSchema* schema, PostgresCopyFieldReader** out, ArrowError* error) { ArrowSchemaView schema_view; NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, nullptr)); switch (schema_view.type) { case NANOARROW_TYPE_BOOL: switch (pg_type.type_id()) { case PostgresTypeId::kBool: *out = new PostgresCopyBooleanFieldReader(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_INT16: switch (pg_type.type_id()) { case PostgresTypeId::kInt2: *out = new PostgresCopyNetworkEndianFieldReader<int16_t>(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_INT32: switch (pg_type.type_id()) { case PostgresTypeId::kInt4: case PostgresTypeId::kOid: case PostgresTypeId::kRegproc: *out = new PostgresCopyNetworkEndianFieldReader<int32_t>(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_INT64: switch (pg_type.type_id()) { case PostgresTypeId::kInt8: *out = new PostgresCopyNetworkEndianFieldReader<int64_t>(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_FLOAT: switch (pg_type.type_id()) { case PostgresTypeId::kFloat4: *out = new PostgresCopyNetworkEndianFieldReader<uint32_t>(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_DOUBLE: switch (pg_type.type_id()) { case PostgresTypeId::kFloat8: *out = new PostgresCopyNetworkEndianFieldReader<uint64_t>(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_STRING: switch (pg_type.type_id()) { case PostgresTypeId::kChar: case PostgresTypeId::kVarchar: case PostgresTypeId::kText: case PostgresTypeId::kBpchar: case PostgresTypeId::kName: *out = new PostgresCopyBinaryFieldReader(); return NANOARROW_OK; case PostgresTypeId::kNumeric: *out = new PostgresCopyNumericFieldReader(); return NANOARROW_OK; default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_BINARY: // No need to check pg_type here: we can return the bytes of any // Postgres type as binary. *out = new PostgresCopyBinaryFieldReader(); return NANOARROW_OK; case NANOARROW_TYPE_LIST: switch (pg_type.type_id()) { case PostgresTypeId::kArray: { if (pg_type.n_children() != 1) { ArrowErrorSet( error, "Expected Postgres array type to have one child but found %ld", static_cast<long>(pg_type.n_children())); // NOLINT(runtime/int) return EINVAL; } auto array_reader = std::unique_ptr<PostgresCopyArrayFieldReader>( new PostgresCopyArrayFieldReader()); array_reader->Init(pg_type); PostgresCopyFieldReader* child_reader; NANOARROW_RETURN_NOT_OK(MakeCopyFieldReader( pg_type.child(0), schema->children[0], &child_reader, error)); array_reader->InitChild(std::unique_ptr<PostgresCopyFieldReader>(child_reader)); *out = array_reader.release(); return NANOARROW_OK; } default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_STRUCT: switch (pg_type.type_id()) { case PostgresTypeId::kRecord: { if (pg_type.n_children() != schema->n_children) { ArrowErrorSet(error, "Can't convert Postgres record type with %ld chlidren to Arrow " "struct type with %ld children", static_cast<long>(pg_type.n_children()), // NOLINT(runtime/int) static_cast<long>(schema->n_children)); // NOLINT(runtime/int) return EINVAL; } auto record_reader = std::unique_ptr<PostgresCopyRecordFieldReader>( new PostgresCopyRecordFieldReader()); record_reader->Init(pg_type); for (int64_t i = 0; i < pg_type.n_children(); i++) { PostgresCopyFieldReader* child_reader; NANOARROW_RETURN_NOT_OK(MakeCopyFieldReader( pg_type.child(i), schema->children[i], &child_reader, error)); record_reader->AppendChild( std::unique_ptr<PostgresCopyFieldReader>(child_reader)); } *out = record_reader.release(); return NANOARROW_OK; } default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_DATE32: { // 2000-01-01 constexpr int32_t kPostgresDateEpoch = 10957; *out = new PostgresCopyNetworkEndianFieldReader<int32_t, kPostgresDateEpoch>(); return NANOARROW_OK; } case NANOARROW_TYPE_TIME64: { *out = new PostgresCopyNetworkEndianFieldReader<int64_t>(); return NANOARROW_OK; } case NANOARROW_TYPE_TIMESTAMP: switch (pg_type.type_id()) { case PostgresTypeId::kTimestamp: case PostgresTypeId::kTimestamptz: { // 2000-01-01 00:00:00.000000 in microseconds constexpr int64_t kPostgresTimestampEpoch = 946684800000000; *out = new PostgresCopyNetworkEndianFieldReader<int64_t, kPostgresTimestampEpoch>(); return NANOARROW_OK; } default: return ErrorCantConvert(error, pg_type, schema_view); } case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: switch (pg_type.type_id()) { case PostgresTypeId::kInterval: { *out = new PostgresCopyIntervalFieldReader(); return NANOARROW_OK; } default: return ErrorCantConvert(error, pg_type, schema_view); } default: return ErrorCantConvert(error, pg_type, schema_view); } } class PostgresCopyStreamReader { public: ArrowErrorCode Init(const PostgresType& pg_type) { if (pg_type.type_id() != PostgresTypeId::kRecord) { return EINVAL; } root_reader_.Init(pg_type); array_size_approx_bytes_ = 0; return NANOARROW_OK; } int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; } ArrowErrorCode SetOutputSchema(ArrowSchema* schema, ArrowError* error) { if (std::string(schema_->format) != "+s") { ArrowErrorSet( error, "Expected output schema of type struct but got output schema with format '%s'", schema_->format); // NOLINT(runtime/int) return EINVAL; } if (schema_->n_children != root_reader_.InputType().n_children()) { ArrowErrorSet(error, "Expected output schema with %ld columns to match Postgres input but " "got schema with %ld columns", static_cast<long>( // NOLINT(runtime/int) root_reader_.InputType().n_children()), static_cast<long>(schema->n_children)); // NOLINT(runtime/int) return EINVAL; } schema_.reset(schema); return NANOARROW_OK; } ArrowErrorCode InferOutputSchema(ArrowError* error) { schema_.reset(); ArrowSchemaInit(schema_.get()); NANOARROW_RETURN_NOT_OK(root_reader_.InputType().SetSchema(schema_.get())); return NANOARROW_OK; } ArrowErrorCode InitFieldReaders(ArrowError* error) { if (schema_->release == nullptr) { return EINVAL; } const PostgresType& root_type = root_reader_.InputType(); for (int64_t i = 0; i < root_type.n_children(); i++) { const PostgresType& child_type = root_type.child(i); PostgresCopyFieldReader* child_reader; NANOARROW_RETURN_NOT_OK( MakeCopyFieldReader(child_type, schema_->children[i], &child_reader, error)); root_reader_.AppendChild(std::unique_ptr<PostgresCopyFieldReader>(child_reader)); } NANOARROW_RETURN_NOT_OK(root_reader_.InitSchema(schema_.get())); return NANOARROW_OK; } ArrowErrorCode ReadHeader(ArrowBufferView* data, ArrowError* error) { if (data->size_bytes < static_cast<int64_t>(sizeof(kPgCopyBinarySignature))) { ArrowErrorSet( error, "Expected PGCOPY signature of %ld bytes at beginning of stream but " "found %ld bytes of input", static_cast<long>(sizeof(kPgCopyBinarySignature)), // NOLINT(runtime/int) static_cast<long>(data->size_bytes)); // NOLINT(runtime/int) return EINVAL; } if (memcmp(data->data.data, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature)) != 0) { ArrowErrorSet(error, "Invalid PGCOPY signature at beginning of stream"); return EINVAL; } data->data.as_uint8 += sizeof(kPgCopyBinarySignature); data->size_bytes -= sizeof(kPgCopyBinarySignature); uint32_t flags; NANOARROW_RETURN_NOT_OK(ReadChecked<uint32_t>(data, &flags, error)); uint32_t extension_length; NANOARROW_RETURN_NOT_OK(ReadChecked<uint32_t>(data, &extension_length, error)); if (data->size_bytes < static_cast<int64_t>(extension_length)) { ArrowErrorSet(error, "Expected %ld bytes of extension metadata at start of stream but " "found %ld bytes of input", static_cast<long>(extension_length), // NOLINT(runtime/int) static_cast<long>(data->size_bytes)); // NOLINT(runtime/int) return EINVAL; } data->data.as_uint8 += extension_length; data->size_bytes -= extension_length; return NANOARROW_OK; } ArrowErrorCode ReadRecord(ArrowBufferView* data, ArrowError* error) { if (array_->release == nullptr) { NANOARROW_RETURN_NOT_OK( ArrowArrayInitFromSchema(array_.get(), schema_.get(), error)); NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(array_.get())); NANOARROW_RETURN_NOT_OK(root_reader_.InitArray(array_.get())); array_size_approx_bytes_ = 0; } const uint8_t* start = data->data.as_uint8; NANOARROW_RETURN_NOT_OK(root_reader_.Read(data, -1, array_.get(), error)); array_size_approx_bytes_ += (data->data.as_uint8 - start); return NANOARROW_OK; } ArrowErrorCode GetSchema(ArrowSchema* out) { return ArrowSchemaDeepCopy(schema_.get(), out); } ArrowErrorCode GetArray(ArrowArray* out, ArrowError* error) { if (array_->release == nullptr) { return EINVAL; } NANOARROW_RETURN_NOT_OK(ArrowArrayFinishBuildingDefault(array_.get(), error)); ArrowArrayMove(array_.get(), out); return NANOARROW_OK; } private: PostgresCopyFieldTupleReader root_reader_; nanoarrow::UniqueSchema schema_; nanoarrow::UniqueArray array_; int64_t array_size_approx_bytes_; }; } // namespace adbcpq