in cpp/src/arrow/compute/light_array_internal.cc [481:635]
Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
ResizableArrayData* target,
int num_rows_to_append, const uint16_t* row_ids,
MemoryPool* pool) {
int num_rows_before = target->num_rows();
ARROW_DCHECK(num_rows_before >= 0);
int num_rows_after = num_rows_before + num_rows_to_append;
if (target->num_rows() == 0) {
RETURN_NOT_OK(target->Init(source->type, pool, kLogNumRows));
}
RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
// Since target->Init is called before, we can assume that the ColumnMetadata
// would never fail to be created
KeyColumnMetadata column_metadata =
ColumnMetadataFromDataType(source->type).ValueOrDie();
if (column_metadata.is_fixed_length) {
// Fixed length column
//
uint32_t fixed_length = column_metadata.fixed_length;
switch (fixed_length) {
case 0:
CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
num_rows_before, num_rows_to_append, row_ids);
break;
case 1:
Visit(source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
target->mutable_data(1)[num_rows_before + i] = *ptr;
});
break;
case 2:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint16_t*>(ptr);
});
break;
case 4:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint32_t*>(ptr);
});
break;
case 8:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint64_t*>(ptr);
});
break;
default: {
int num_rows_to_process =
num_rows_to_append -
NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
Visit(source, num_rows_to_process, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(1) +
static_cast<int64_t>(num_bytes) * (num_rows_before + i));
const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
for (uint32_t word_id = 0;
word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
++word_id) {
util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
}
});
if (num_rows_to_append > num_rows_to_process) {
Visit(source, num_rows_to_append - num_rows_to_process,
row_ids + num_rows_to_process,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(1) +
static_cast<int64_t>(num_bytes) *
(num_rows_before + num_rows_to_process + i));
const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
memcpy(dst, src, num_bytes);
});
}
}
}
} else {
// Varying length column
//
// Step 1: calculate target offsets
//
int32_t* offsets = reinterpret_cast<int32_t*>(target->mutable_data(1));
int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
Visit(source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
offsets[num_rows_before + i] = num_bytes;
});
for (int i = 0; i < num_rows_to_append; ++i) {
int32_t length = offsets[num_rows_before + i];
offsets[num_rows_before + i] = sum;
int32_t new_sum_maybe_overflow = 0;
if (ARROW_PREDICT_FALSE(
arrow::internal::AddWithOverflow(sum, length, &new_sum_maybe_overflow))) {
return Status::Invalid("Overflow detected in ExecBatchBuilder when appending ",
num_rows_before + i + 1, "-th element of length ", length,
" bytes to current length ", sum, " bytes");
}
sum = new_sum_maybe_overflow;
}
offsets[num_rows_before + num_rows_to_append] = sum;
// Step 2: resize output buffers
//
RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
// Step 3: copy varying-length data
//
int num_rows_to_process =
num_rows_to_append -
NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
Visit(source, num_rows_to_process, row_ids,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
offsets[num_rows_before + i]);
const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
for (uint32_t word_id = 0;
word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
}
});
Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
auto dst = target->mutable_data(2) +
offsets[num_rows_before + num_rows_to_process + i];
memcpy(dst, ptr, num_bytes);
});
}
// Process nulls
//
if (source->buffers[0] == NULLPTR) {
uint8_t* dst = target->mutable_data(0);
dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
for (int i = num_rows_before / 8 + 1;
i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
dst[i] = 0xff;
}
} else {
CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
num_rows_before, num_rows_to_append, row_ids);
}
return Status::OK();
}