func binaryFilterImpl[OffsetT int32 | int64]()

in arrow/compute/internal/kernels/vector_selection.go [1200:1369]


func binaryFilterImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, values, filter *exec.ArraySpan, outputLen int64, nullSelection NullSelectionBehavior, out *exec.ExecResult) error {
	var (
		filterData    = filter.Buffers[1].Buf
		filterIsValid = filter.Buffers[0].Buf
		filterOffset  = filter.Offset

		valuesIsValid = values.Buffers[0].Buf
		valuesOffset  = values.Offset
		// output bitmap should already be zero'd out so we just
		// have to set valid bits to true
		outIsValid = out.Buffers[0].Buf

		rawOffsets    = exec.GetSpanOffsets[OffsetT](values, 1)
		rawData       = values.Buffers[2].Buf
		offsetBuilder = newBufferBuilder[OffsetT](exec.GetAllocator(ctx.Ctx))
		dataBuilder   = newBufferBuilder[uint8](exec.GetAllocator(ctx.Ctx))
	)

	offsetBuilder.reserve(int(outputLen) + 1)
	if values.Len > 0 {
		meanValueLength := float64(rawOffsets[values.Len]-rawOffsets[0]) / float64(values.Len)
		dataBuilder.reserve(int(meanValueLength * float64(outputLen)))
	}

	spaceAvail := dataBuilder.cap()
	var offset OffsetT

	// we use 3 block counters for fast scanning of the filter
	//
	// * valuesValidCounter: for values null/not-null
	// * filterValidCounter: for filter null/not-null
	// * filterCounter: for filter true/false
	valuesValidCounter := bitutils.NewOptionalBitBlockCounter(values.Buffers[0].Buf, values.Offset, values.Len)
	filterValidCounter := bitutils.NewOptionalBitBlockCounter(filterIsValid, filterOffset, filter.Len)
	filterCounter := bitutils.NewBitBlockCounter(filterData, filterOffset, filter.Len)

	inPos, outPos := int64(0), int64(0)

	appendRaw := func(data []byte) {
		if len(data) > spaceAvail {
			dataBuilder.reserve(len(data))
			spaceAvail = dataBuilder.cap() - dataBuilder.len()
		}
		dataBuilder.unsafeAppendSlice(data)
		spaceAvail -= len(data)
	}

	appendSingle := func() {
		data := rawData[rawOffsets[inPos]:rawOffsets[inPos+1]]
		appendRaw(data)
		offset += OffsetT(len(data))
	}

	for inPos < filter.Len {
		filterValidBlock, valuesValidBlock := filterValidCounter.NextWord(), valuesValidCounter.NextWord()
		filterBlock := filterCounter.NextWord()
		switch {
		case filterBlock.NoneSet() && nullSelection == DropNulls:
			// for this exceedingly common case in low-selectivity filters
			// we can skip further analysis of the data and move on to the
			// next block
			inPos += int64(filterBlock.Len)
		case filterValidBlock.AllSet():
			// simpler path: no filter values are null
			if filterBlock.AllSet() {
				// fastest path: filter values are all true and not null
				if valuesValidBlock.AllSet() {
					// the values aren't null either
					bitutil.SetBitsTo(outIsValid, outPos, int64(filterBlock.Len), true)

					// bulk-append raw data
					start, end := rawOffsets[inPos], rawOffsets[inPos+int64(filterBlock.Len)]
					appendRaw(rawData[start:end])
					// append offsets
					for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 {
						offsetBuilder.unsafeAppend(offset)
						offset += rawOffsets[inPos+1] - rawOffsets[inPos]
					}
					outPos += int64(filterBlock.Len)
				} else {
					// some of the values in this block are null
					for i := 0; i < int(filterBlock.Len); i, inPos, outPos = i+1, inPos+1, outPos+1 {
						offsetBuilder.unsafeAppend(offset)
						if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) {
							bitutil.SetBit(outIsValid, int(outPos))
							appendSingle()
						}
					}
				}
				continue
			}
			// !filterBlock.AllSet()
			// some of the filter values are false, but all not null
			if valuesValidBlock.AllSet() {
				// all the values are non-null, so we can skip null checking
				for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 {
					if bitutil.BitIsSet(filterData, int(filterOffset+inPos)) {
						offsetBuilder.unsafeAppend(offset)
						bitutil.SetBit(outIsValid, int(outPos))
						outPos++
						appendSingle()
					}
				}
			} else {
				// some of the values in the block are null, so we have to check
				for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 {
					if bitutil.BitIsSet(filterData, int(filterOffset+inPos)) {
						offsetBuilder.unsafeAppend(offset)
						if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) {
							bitutil.SetBit(outIsValid, int(outPos))
							appendSingle()
						}
						outPos++
					}
				}
			}
		default:
			// !filterValidBlock.AllSet()
			// some of the filter values are null, so we have to handle
			// the DROP vs EMIT_NULL null selection behavior
			if nullSelection == DropNulls {
				// filter null values are treated as false
				if valuesValidBlock.AllSet() {
					for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 {
						if bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) &&
							bitutil.BitIsSet(filterData, int(filterOffset+inPos)) {
							offsetBuilder.unsafeAppend(offset)
							bitutil.SetBit(outIsValid, int(outPos))
							outPos++
							appendSingle()
						}
					}
				} else {
					for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 {
						if bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) &&
							bitutil.BitIsSet(filterData, int(filterOffset+inPos)) {
							offsetBuilder.unsafeAppend(offset)
							if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) {
								bitutil.SetBit(outIsValid, int(outPos))
								appendSingle()
							}
							outPos++
						}
					}
				}
			} else {
				for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 {
					filterNotNull := bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos))
					if filterNotNull && bitutil.BitIsSet(filterData, int(filterOffset+inPos)) {
						offsetBuilder.unsafeAppend(offset)
						if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) {
							bitutil.SetBit(outIsValid, int(outPos))
							appendSingle()
						}
						outPos++
					} else if !filterNotNull {
						offsetBuilder.unsafeAppend(offset)
						outPos++
					}
				}
			}
		}
	}

	offsetBuilder.unsafeAppend(offset)
	out.Len = outputLen
	out.Buffers[1].WrapBuffer(offsetBuilder.finish())
	out.Buffers[2].WrapBuffer(dataBuilder.finish())
	return nil
}