Status ExecBatchBuilder::AppendSelected()

in cpp/src/arrow/compute/light_array_internal.cc [481:635]


Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
                                        ResizableArrayData* target,
                                        int num_rows_to_append, const uint16_t* row_ids,
                                        MemoryPool* pool) {
  int num_rows_before = target->num_rows();
  ARROW_DCHECK(num_rows_before >= 0);
  int num_rows_after = num_rows_before + num_rows_to_append;
  if (target->num_rows() == 0) {
    RETURN_NOT_OK(target->Init(source->type, pool, kLogNumRows));
  }
  RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));

  // Since target->Init is called before, we can assume that the ColumnMetadata
  // would never fail to be created
  KeyColumnMetadata column_metadata =
      ColumnMetadataFromDataType(source->type).ValueOrDie();

  if (column_metadata.is_fixed_length) {
    // Fixed length column
    //
    uint32_t fixed_length = column_metadata.fixed_length;
    switch (fixed_length) {
      case 0:
        CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
                    num_rows_before, num_rows_to_append, row_ids);
        break;
      case 1:
        Visit(source, num_rows_to_append, row_ids,
              [&](int i, const uint8_t* ptr, int32_t num_bytes) {
                target->mutable_data(1)[num_rows_before + i] = *ptr;
              });
        break;
      case 2:
        Visit(
            source, num_rows_to_append, row_ids,
            [&](int i, const uint8_t* ptr, int32_t num_bytes) {
              reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
                  *reinterpret_cast<const uint16_t*>(ptr);
            });
        break;
      case 4:
        Visit(
            source, num_rows_to_append, row_ids,
            [&](int i, const uint8_t* ptr, int32_t num_bytes) {
              reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
                  *reinterpret_cast<const uint32_t*>(ptr);
            });
        break;
      case 8:
        Visit(
            source, num_rows_to_append, row_ids,
            [&](int i, const uint8_t* ptr, int32_t num_bytes) {
              reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
                  *reinterpret_cast<const uint64_t*>(ptr);
            });
        break;
      default: {
        int num_rows_to_process =
            num_rows_to_append -
            NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
        Visit(source, num_rows_to_process, row_ids,
              [&](int i, const uint8_t* ptr, int32_t num_bytes) {
                uint64_t* dst = reinterpret_cast<uint64_t*>(
                    target->mutable_data(1) +
                    static_cast<int64_t>(num_bytes) * (num_rows_before + i));
                const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
                for (uint32_t word_id = 0;
                     word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
                     ++word_id) {
                  util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
                }
              });
        if (num_rows_to_append > num_rows_to_process) {
          Visit(source, num_rows_to_append - num_rows_to_process,
                row_ids + num_rows_to_process,
                [&](int i, const uint8_t* ptr, int32_t num_bytes) {
                  uint64_t* dst = reinterpret_cast<uint64_t*>(
                      target->mutable_data(1) +
                      static_cast<int64_t>(num_bytes) *
                          (num_rows_before + num_rows_to_process + i));
                  const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
                  memcpy(dst, src, num_bytes);
                });
        }
      }
    }
  } else {
    // Varying length column
    //

    // Step 1: calculate target offsets
    //
    int32_t* offsets = reinterpret_cast<int32_t*>(target->mutable_data(1));
    int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
    Visit(source, num_rows_to_append, row_ids,
          [&](int i, const uint8_t* ptr, int32_t num_bytes) {
            offsets[num_rows_before + i] = num_bytes;
          });
    for (int i = 0; i < num_rows_to_append; ++i) {
      int32_t length = offsets[num_rows_before + i];
      offsets[num_rows_before + i] = sum;
      int32_t new_sum_maybe_overflow = 0;
      if (ARROW_PREDICT_FALSE(
              arrow::internal::AddWithOverflow(sum, length, &new_sum_maybe_overflow))) {
        return Status::Invalid("Overflow detected in ExecBatchBuilder when appending ",
                               num_rows_before + i + 1, "-th element of length ", length,
                               " bytes to current length ", sum, " bytes");
      }
      sum = new_sum_maybe_overflow;
    }
    offsets[num_rows_before + num_rows_to_append] = sum;

    // Step 2: resize output buffers
    //
    RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());

    // Step 3: copy varying-length data
    //
    int num_rows_to_process =
        num_rows_to_append -
        NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
    Visit(source, num_rows_to_process, row_ids,
          [&](int i, const uint8_t* ptr, int32_t num_bytes) {
            uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
                                                        offsets[num_rows_before + i]);
            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
            for (uint32_t word_id = 0;
                 word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
              util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
            }
          });
    Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
          [&](int i, const uint8_t* ptr, int32_t num_bytes) {
            auto dst = target->mutable_data(2) +
                       offsets[num_rows_before + num_rows_to_process + i];
            memcpy(dst, ptr, num_bytes);
          });
  }

  // Process nulls
  //
  if (source->buffers[0] == NULLPTR) {
    uint8_t* dst = target->mutable_data(0);
    dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
    for (int i = num_rows_before / 8 + 1;
         i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
      dst[i] = 0xff;
    }
  } else {
    CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
                num_rows_before, num_rows_to_append, row_ids);
  }

  return Status::OK();
}