Status TransferColumnData()

in cpp/src/parquet/arrow/reader_internal.cc [832:985]


Status TransferColumnData(RecordReader* reader,
                          std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
                          const std::shared_ptr<Field>& value_field,
                          const ColumnDescriptor* descr, const ReaderContext* ctx,
                          std::shared_ptr<ChunkedArray>* out) {
  auto pool = ctx->pool;
  Datum result;
  std::shared_ptr<ChunkedArray> chunked_result;
  switch (value_field->type()->id()) {
    case ::arrow::Type::DICTIONARY: {
      RETURN_NOT_OK(TransferDictionary(reader, pool, value_field->type(),
                                       value_field->nullable(), &chunked_result));
      result = chunked_result;
    } break;
    case ::arrow::Type::NA: {
      result = std::make_shared<::arrow::NullArray>(reader->values_written());
      break;
    }
    case ::arrow::Type::INT32:
      result = TransferZeroCopy<::arrow::Int32Type, Int32Type>(
          reader, std::move(metadata), ctx, value_field);
      break;
    case ::arrow::Type::INT64:
      result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
          reader, std::move(metadata), ctx, value_field);
      break;
    case ::arrow::Type::FLOAT:
      result = TransferZeroCopy<::arrow::FloatType, FloatType>(
          reader, std::move(metadata), ctx, value_field);
      break;
    case ::arrow::Type::DOUBLE:
      result = TransferZeroCopy<::arrow::DoubleType, DoubleType>(
          reader, std::move(metadata), ctx, value_field);
      break;
    case ::arrow::Type::BOOL:
      RETURN_NOT_OK(TransferBool(reader, std::move(metadata), ctx,
                                 value_field->nullable(), &result));
      break;
      TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
      TRANSFER_INT32(INT8, ::arrow::Int8Type);
      TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
      TRANSFER_INT32(INT16, ::arrow::Int16Type);
      TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
      TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
      TRANSFER_INT32(DATE32, ::arrow::Date32Type);
      TRANSFER_INT32(TIME32, ::arrow::Time32Type);
      TRANSFER_INT64(TIME64, ::arrow::Time64Type);
      TRANSFER_INT64(DURATION, ::arrow::DurationType);
    case ::arrow::Type::DATE64:
      RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result));
      break;
    case ::arrow::Type::FIXED_SIZE_BINARY:
    case ::arrow::Type::BINARY:
    case ::arrow::Type::STRING:
    case ::arrow::Type::LARGE_BINARY:
    case ::arrow::Type::LARGE_STRING: {
      RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result));
      result = chunked_result;
    } break;
    case ::arrow::Type::HALF_FLOAT: {
      const auto& type = *value_field->type();
      if (descr->physical_type() != ::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
        return Status::Invalid("Physical type for ", type.ToString(),
                               " must be fixed length binary");
      }
      if (descr->type_length() != type.byte_width()) {
        return Status::Invalid("Fixed length binary type for ", type.ToString(),
                               " must have a byte width of ", type.byte_width());
      }
      RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result));
    } break;
    case ::arrow::Type::DECIMAL128: {
      switch (descr->physical_type()) {
        case ::parquet::Type::INT32: {
          auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        case ::parquet::Type::INT64: {
          auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        case ::parquet::Type::BYTE_ARRAY: {
          auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
          auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        default:
          return Status::Invalid(
              "Physical type for decimal128 must be int32, int64, byte array, or fixed "
              "length binary");
      }
    } break;
    case ::arrow::Type::DECIMAL256:
      switch (descr->physical_type()) {
        case ::parquet::Type::INT32: {
          auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        case ::parquet::Type::INT64: {
          auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        case ::parquet::Type::BYTE_ARRAY: {
          auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
          auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
        } break;
        default:
          return Status::Invalid(
              "Physical type for decimal256 must be int32, int64, byte array, or fixed "
              "length binary");
      }
      break;

    case ::arrow::Type::TIMESTAMP: {
      const ::arrow::TimestampType& timestamp_type =
          checked_cast<::arrow::TimestampType&>(*value_field->type());
      if (descr->physical_type() == ::parquet::Type::INT96) {
        RETURN_NOT_OK(
            TransferInt96(reader, pool, value_field, &result, timestamp_type.unit()));
      } else {
        switch (timestamp_type.unit()) {
          case ::arrow::TimeUnit::MILLI:
          case ::arrow::TimeUnit::MICRO:
          case ::arrow::TimeUnit::NANO:
            result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
                reader, std::move(metadata), ctx, value_field);
            break;
          default:
            return Status::NotImplemented("TimeUnit not supported");
        }
      }
    } break;
    default:
      return Status::NotImplemented("No support for reading columns of type ",
                                    value_field->type()->ToString());
  }

  if (result.kind() == Datum::ARRAY) {
    *out = std::make_shared<ChunkedArray>(result.make_array());
  } else if (result.kind() == Datum::CHUNKED_ARRAY) {
    *out = result.chunked_array();
  } else {
    DCHECK(false) << "Should be impossible, result was " << result.ToString();
  }

  return Status::OK();
}