in cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc [598:800]
Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
const ArraySpan& filter, int64_t output_length,
FilterOptions::NullSelectionBehavior null_selection,
ArrayData* out) {
using offset_type = typename ArrowType::offset_type;
const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
BINARY_FILTER_SETUP_COMMON();
const uint8_t* values_is_valid = values.buffers[0].data;
const int64_t values_offset = values.offset;
const int64_t out_offset = out->offset;
uint8_t* out_is_valid = out->buffers[0]->mutable_data();
// Zero bits and then only have to set valid values to true
bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
int64_t in_position = 0;
int64_t out_position = 0;
if (is_ree_filter) {
auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
in_position = position;
if (filter_valid) {
// Filter values are all true and not null
// Some of the values in the block may be null
for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
offset_builder.UnsafeAppend(offset);
if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
bit_util::SetBit(out_is_valid, out_offset + out_position);
APPEND_SINGLE_VALUE();
}
}
} else {
offset_builder.UnsafeAppend(segment_length, offset);
out_position += segment_length;
}
return Status::OK();
};
Status status;
VisitPlainxREEFilterOutputSegments(
filter, /*filter_may_have_nulls=*/true, null_selection,
[&status, emit_segment = std::move(emit_segment)](
int64_t position, int64_t segment_length, bool filter_valid) {
status = emit_segment(position, segment_length, filter_valid);
return status.ok();
});
RETURN_NOT_OK(std::move(status));
} else {
const auto filter_data = filter.buffers[1].data;
const uint8_t* filter_is_valid = filter.buffers[0].data;
const int64_t filter_offset = filter.offset;
// We use 3 block counters for fast scanning of the filter
//
// * values_valid_counter: for values null/not-null
// * filter_valid_counter: for filter null/not-null
// * filter_counter: for filter true/false
OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
values.length);
OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
filter.length);
BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
while (in_position < filter.length) {
BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
BitBlockCount values_valid_block = values_valid_counter.NextWord();
BitBlockCount filter_block = filter_counter.NextWord();
if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
// For this exceedingly common case in low-selectivity filters we can
// skip further analysis of the data and move on to the next block.
in_position += filter_block.length;
} else if (filter_valid_block.AllSet()) {
// Simpler path: no filter values are null
if (filter_block.AllSet()) {
// Fastest path: filter values are all true and not null
if (values_valid_block.AllSet()) {
// The values aren't null either
bit_util::SetBitsTo(out_is_valid, out_offset + out_position,
filter_block.length, true);
// Bulk-append raw data
offset_type block_data_bytes =
(raw_offsets[in_position + filter_block.length] -
raw_offsets[in_position]);
APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
// Append offsets
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
offset_builder.UnsafeAppend(offset);
offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
}
out_position += filter_block.length;
} else {
// Some of the values in this block are null
for (int64_t i = 0; i < filter_block.length;
++i, ++in_position, ++out_position) {
offset_builder.UnsafeAppend(offset);
if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
bit_util::SetBit(out_is_valid, out_offset + out_position);
APPEND_SINGLE_VALUE();
}
}
}
} else { // !filter_block.AllSet()
// Some of the filter values are false, but all not null
if (values_valid_block.AllSet()) {
// All the values are not-null, so we can skip null checking for
// them
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
offset_builder.UnsafeAppend(offset);
bit_util::SetBit(out_is_valid, out_offset + out_position++);
APPEND_SINGLE_VALUE();
}
}
} else {
// Some of the values in the block are null, so we have to check
// each one
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
offset_builder.UnsafeAppend(offset);
if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
bit_util::SetBit(out_is_valid, out_offset + out_position);
APPEND_SINGLE_VALUE();
}
++out_position;
}
}
}
}
} else { // !filter_valid_block.AllSet()
// Some of the filter values are null, so we have to handle the DROP
// versus EMIT_NULL null selection behavior.
if (null_selection == FilterOptions::DROP) {
// Filter null values are treated as false.
if (values_valid_block.AllSet()) {
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
bit_util::GetBit(filter_data, filter_offset + in_position)) {
offset_builder.UnsafeAppend(offset);
bit_util::SetBit(out_is_valid, out_offset + out_position++);
APPEND_SINGLE_VALUE();
}
}
} else {
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
bit_util::GetBit(filter_data, filter_offset + in_position)) {
offset_builder.UnsafeAppend(offset);
if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
bit_util::SetBit(out_is_valid, out_offset + out_position);
APPEND_SINGLE_VALUE();
}
++out_position;
}
}
}
} else {
// EMIT_NULL
// Filter null values are appended to output as null whether the
// value in the corresponding slot is valid or not
if (values_valid_block.AllSet()) {
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
const bool filter_not_null =
bit_util::GetBit(filter_is_valid, filter_offset + in_position);
if (filter_not_null &&
bit_util::GetBit(filter_data, filter_offset + in_position)) {
offset_builder.UnsafeAppend(offset);
bit_util::SetBit(out_is_valid, out_offset + out_position++);
APPEND_SINGLE_VALUE();
} else if (!filter_not_null) {
offset_builder.UnsafeAppend(offset);
++out_position;
}
}
} else {
for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
const bool filter_not_null =
bit_util::GetBit(filter_is_valid, filter_offset + in_position);
if (filter_not_null &&
bit_util::GetBit(filter_data, filter_offset + in_position)) {
offset_builder.UnsafeAppend(offset);
if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
bit_util::SetBit(out_is_valid, out_offset + out_position);
APPEND_SINGLE_VALUE();
}
++out_position;
} else if (!filter_not_null) {
offset_builder.UnsafeAppend(offset);
++out_position;
}
}
}
}
}
}
}
offset_builder.UnsafeAppend(offset);
out->length = output_length;
RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
return data_builder.Finish(&out->buffers[2]);
}