Status PrimitiveImpl::NextBatch()

in src/parquet/arrow/reader.cc [1305:1387]


Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
  try {
    // Pre-allocation gives much better performance for flat columns
    record_reader_->Reserve(records_to_read);

    record_reader_->Reset();
    while (records_to_read > 0) {
      if (!record_reader_->HasMoreData()) {
        break;
      }
      int64_t records_read = record_reader_->ReadRecords(records_to_read);
      records_to_read -= records_read;
      if (records_read == 0) {
        NextRowGroup();
      }
    }
  } catch (const ::parquet::ParquetException& e) {
    return ::arrow::Status::IOError(e.what());
  }

  switch (field_->type()->id()) {
    TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
    TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
    TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
    TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
    TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
    TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
    TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
    TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
    TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
    TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
    TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
    TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
    TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
    TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
    TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
    TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
    case ::arrow::Type::NA: {
      *out = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
      RETURN_NOT_OK(WrapIntoListArray<Int32Type>(out));
      break;
    }
    case ::arrow::Type::DECIMAL: {
      switch (descr_->physical_type()) {
        case ::parquet::Type::INT32: {
          TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
        } break;
        case ::parquet::Type::INT64: {
          TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
        } break;
        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
          TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
        } break;
        default:
          return Status::Invalid(
              "Physical type for decimal must be int32, int64, or fixed length binary");
      }
    } break;
    case ::arrow::Type::TIMESTAMP: {
      ::arrow::TimestampType* timestamp_type =
          static_cast<::arrow::TimestampType*>(field_->type().get());
      switch (timestamp_type->unit()) {
        case ::arrow::TimeUnit::MILLI:
        case ::arrow::TimeUnit::MICRO: {
          TRANSFER_DATA(::arrow::TimestampType, Int64Type);
        } break;
        case ::arrow::TimeUnit::NANO: {
          TRANSFER_DATA(::arrow::TimestampType, Int96Type);
        } break;
        default:
          return Status::NotImplemented("TimeUnit not supported");
      }
    } break;
      TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
      TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
    default:
      std::stringstream ss;
      ss << "No support for reading columns of type " << field_->type()->ToString();
      return Status::NotImplemented(ss.str());
  }

  return Status::OK();
}