in src/parquet/arrow/writer.cc [636:701]
Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
const int16_t* def_levels,
const int16_t* rep_levels) {
int64_t* buffer;
RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
auto values = data.raw_values();
const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
? ctx_->properties->coerce_timestamps_unit()
: TimeUnit::MICRO;
auto target_type = ::arrow::timestamp(target_unit);
auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
if (!data.IsNull(i) && (values[i] % factor != 0)) {
std::stringstream ss;
ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
<< " would lose data: " << values[i];
return Status::Invalid(ss.str());
}
buffer[i] = values[i] / factor;
}
return Status::OK();
};
auto MultiplyBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
buffer[i] = values[i] * factor;
}
return Status::OK();
};
if (type.unit() == TimeUnit::NANO) {
if (target_unit == TimeUnit::MICRO) {
RETURN_NOT_OK(DivideBy(1000));
} else {
DCHECK_EQ(TimeUnit::MILLI, target_unit);
RETURN_NOT_OK(DivideBy(1000000));
}
} else if (type.unit() == TimeUnit::SECOND) {
RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
} else if (type.unit() == TimeUnit::MILLI) {
DCHECK_EQ(TimeUnit::MICRO, target_unit);
RETURN_NOT_OK(MultiplyBy(1000));
} else {
DCHECK_EQ(TimeUnit::MILLI, target_unit);
RETURN_NOT_OK(DivideBy(1000));
}
if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
num_levels, def_levels, rep_levels, buffer)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
}
return Status::OK();
}