Status ArrowColumnWriter::WriteTimestampsCoerce()

in src/parquet/arrow/writer.cc [636:701]


Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
                                                const int16_t* def_levels,
                                                const int16_t* rep_levels) {
  int64_t* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));

  const auto& data = static_cast<const ::arrow::TimestampArray&>(array);

  auto values = data.raw_values();
  const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());

  TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
                                   ? ctx_->properties->coerce_timestamps_unit()
                                   : TimeUnit::MICRO;
  auto target_type = ::arrow::timestamp(target_unit);

  auto DivideBy = [&](const int64_t factor) {
    for (int64_t i = 0; i < array.length(); i++) {
      if (!data.IsNull(i) && (values[i] % factor != 0)) {
        std::stringstream ss;
        ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
           << " would lose data: " << values[i];
        return Status::Invalid(ss.str());
      }
      buffer[i] = values[i] / factor;
    }
    return Status::OK();
  };

  auto MultiplyBy = [&](const int64_t factor) {
    for (int64_t i = 0; i < array.length(); i++) {
      buffer[i] = values[i] * factor;
    }
    return Status::OK();
  };

  if (type.unit() == TimeUnit::NANO) {
    if (target_unit == TimeUnit::MICRO) {
      RETURN_NOT_OK(DivideBy(1000));
    } else {
      DCHECK_EQ(TimeUnit::MILLI, target_unit);
      RETURN_NOT_OK(DivideBy(1000000));
    }
  } else if (type.unit() == TimeUnit::SECOND) {
    RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
  } else if (type.unit() == TimeUnit::MILLI) {
    DCHECK_EQ(TimeUnit::MICRO, target_unit);
    RETURN_NOT_OK(MultiplyBy(1000));
  } else {
    DCHECK_EQ(TimeUnit::MILLI, target_unit);
    RETURN_NOT_OK(DivideBy(1000));
  }

  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
    // no nulls, just dump the data
    RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
        num_levels, def_levels, rep_levels, buffer)));
  } else {
    const uint8_t* valid_bits = data.null_bitmap_data();
    RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
        num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
  }
  return Status::OK();
}