Status BinaryFilterImpl()

in cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc [598:800]


Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                        const ArraySpan& filter, int64_t output_length,
                        FilterOptions::NullSelectionBehavior null_selection,
                        ArrayData* out) {
  using offset_type = typename ArrowType::offset_type;

  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;

  BINARY_FILTER_SETUP_COMMON();

  const uint8_t* values_is_valid = values.buffers[0].data;
  const int64_t values_offset = values.offset;

  const int64_t out_offset = out->offset;
  uint8_t* out_is_valid = out->buffers[0]->mutable_data();
  // Zero bits and then only have to set valid values to true
  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);

  int64_t in_position = 0;
  int64_t out_position = 0;
  if (is_ree_filter) {
    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
      in_position = position;
      if (filter_valid) {
        // Filter values are all true and not null
        // Some of the values in the block may be null
        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
          offset_builder.UnsafeAppend(offset);
          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
            bit_util::SetBit(out_is_valid, out_offset + out_position);
            APPEND_SINGLE_VALUE();
          }
        }
      } else {
        offset_builder.UnsafeAppend(segment_length, offset);
        out_position += segment_length;
      }
      return Status::OK();
    };
    Status status;
    VisitPlainxREEFilterOutputSegments(
        filter, /*filter_may_have_nulls=*/true, null_selection,
        [&status, emit_segment = std::move(emit_segment)](
            int64_t position, int64_t segment_length, bool filter_valid) {
          status = emit_segment(position, segment_length, filter_valid);
          return status.ok();
        });
    RETURN_NOT_OK(std::move(status));
  } else {
    const auto filter_data = filter.buffers[1].data;
    const uint8_t* filter_is_valid = filter.buffers[0].data;
    const int64_t filter_offset = filter.offset;

    // We use 3 block counters for fast scanning of the filter
    //
    // * values_valid_counter: for values null/not-null
    // * filter_valid_counter: for filter null/not-null
    // * filter_counter: for filter true/false
    OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
                                                 values.length);
    OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
                                                 filter.length);
    BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);

    while (in_position < filter.length) {
      BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
      BitBlockCount values_valid_block = values_valid_counter.NextWord();
      BitBlockCount filter_block = filter_counter.NextWord();
      if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
        // For this exceedingly common case in low-selectivity filters we can
        // skip further analysis of the data and move on to the next block.
        in_position += filter_block.length;
      } else if (filter_valid_block.AllSet()) {
        // Simpler path: no filter values are null
        if (filter_block.AllSet()) {
          // Fastest path: filter values are all true and not null
          if (values_valid_block.AllSet()) {
            // The values aren't null either
            bit_util::SetBitsTo(out_is_valid, out_offset + out_position,
                                filter_block.length, true);

            // Bulk-append raw data
            offset_type block_data_bytes =
                (raw_offsets[in_position + filter_block.length] -
                 raw_offsets[in_position]);
            APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
            // Append offsets
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              offset_builder.UnsafeAppend(offset);
              offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
            }
            out_position += filter_block.length;
          } else {
            // Some of the values in this block are null
            for (int64_t i = 0; i < filter_block.length;
                 ++i, ++in_position, ++out_position) {
              offset_builder.UnsafeAppend(offset);
              if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
                bit_util::SetBit(out_is_valid, out_offset + out_position);
                APPEND_SINGLE_VALUE();
              }
            }
          }
        } else {  // !filter_block.AllSet()
          // Some of the filter values are false, but all not null
          if (values_valid_block.AllSet()) {
            // All the values are not-null, so we can skip null checking for
            // them
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
                offset_builder.UnsafeAppend(offset);
                bit_util::SetBit(out_is_valid, out_offset + out_position++);
                APPEND_SINGLE_VALUE();
              }
            }
          } else {
            // Some of the values in the block are null, so we have to check
            // each one
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
                offset_builder.UnsafeAppend(offset);
                if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
                  bit_util::SetBit(out_is_valid, out_offset + out_position);
                  APPEND_SINGLE_VALUE();
                }
                ++out_position;
              }
            }
          }
        }
      } else {  // !filter_valid_block.AllSet()
        // Some of the filter values are null, so we have to handle the DROP
        // versus EMIT_NULL null selection behavior.
        if (null_selection == FilterOptions::DROP) {
          // Filter null values are treated as false.
          if (values_valid_block.AllSet()) {
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                offset_builder.UnsafeAppend(offset);
                bit_util::SetBit(out_is_valid, out_offset + out_position++);
                APPEND_SINGLE_VALUE();
              }
            }
          } else {
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                offset_builder.UnsafeAppend(offset);
                if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
                  bit_util::SetBit(out_is_valid, out_offset + out_position);
                  APPEND_SINGLE_VALUE();
                }
                ++out_position;
              }
            }
          }
        } else {
          // EMIT_NULL

          // Filter null values are appended to output as null whether the
          // value in the corresponding slot is valid or not
          if (values_valid_block.AllSet()) {
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              const bool filter_not_null =
                  bit_util::GetBit(filter_is_valid, filter_offset + in_position);
              if (filter_not_null &&
                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                offset_builder.UnsafeAppend(offset);
                bit_util::SetBit(out_is_valid, out_offset + out_position++);
                APPEND_SINGLE_VALUE();
              } else if (!filter_not_null) {
                offset_builder.UnsafeAppend(offset);
                ++out_position;
              }
            }
          } else {
            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
              const bool filter_not_null =
                  bit_util::GetBit(filter_is_valid, filter_offset + in_position);
              if (filter_not_null &&
                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                offset_builder.UnsafeAppend(offset);
                if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
                  bit_util::SetBit(out_is_valid, out_offset + out_position);
                  APPEND_SINGLE_VALUE();
                }
                ++out_position;
              } else if (!filter_not_null) {
                offset_builder.UnsafeAppend(offset);
                ++out_position;
              }
            }
          }
        }
      }
    }
  }
  offset_builder.UnsafeAppend(offset);
  out->length = output_length;
  RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
  return data_builder.Finish(&out->buffers[2]);
}