void apply()

in velox/functions/prestosql/MapConcat.cpp [31:155]


  void apply(
      const SelectivityVector& rows,
      std::vector<VectorPtr>& args,
      const TypePtr& outputType,
      exec::EvalCtx* context,
      VectorPtr* result) const override {
    VELOX_CHECK(args.size() >= 2);
    auto mapType = args[0]->type();
    VELOX_CHECK_EQ(mapType->kind(), TypeKind::MAP);
    for (auto& arg : args) {
      VELOX_CHECK(mapType->kindEquals(arg->type()));
    }
    VELOX_CHECK(mapType->kindEquals(outputType));

    auto numArgs = args.size();
    exec::DecodedArgs decodedArgs(rows, args, context);
    vector_size_t maxSize = 0;
    for (auto i = 0; i < numArgs; i++) {
      auto decodedArg = decodedArgs.at(i);
      auto inputMap = decodedArg->base()->as<MapVector>();
      auto rawSizes = inputMap->rawSizes();
      rows.applyToSelected([&](vector_size_t row) {
        maxSize += rawSizes[decodedArg->index(row)];
      });
    }

    auto keyType = outputType->asMap().keyType();
    auto valueType = outputType->asMap().valueType();

    auto combinedKeys = BaseVector::create(keyType, maxSize, context->pool());
    auto combinedValues =
        BaseVector::create(valueType, maxSize, context->pool());

    // Initialize offsets and sizes to 0 so that canonicalize() will
    // work also for sparse 'rows'.
    BufferPtr offsets = allocateOffsets(rows.size(), context->pool());
    auto rawOffsets = offsets->asMutable<vector_size_t>();

    BufferPtr sizes = allocateSizes(rows.size(), context->pool());
    auto rawSizes = sizes->asMutable<vector_size_t>();

    vector_size_t offset = 0;
    rows.applyToSelected([&](vector_size_t row) {
      rawOffsets[row] = offset;
      // Reuse the last offset and size if null key must create empty map
      for (auto i = 0; i < numArgs; i++) {
        auto decodedArg = decodedArgs.at(i);
        if (EmptyForNull && decodedArg->isNullAt(row)) {
          continue; // Treat NULL maps as empty.
        }
        auto inputMap = decodedArg->base()->as<MapVector>();
        auto index = decodedArg->index(row);
        auto inputOffset = inputMap->offsetAt(index);
        auto inputSize = inputMap->sizeAt(index);
        combinedKeys->copy(
            inputMap->mapKeys().get(), offset, inputOffset, inputSize);
        combinedValues->copy(
            inputMap->mapValues().get(), offset, inputOffset, inputSize);
        offset += inputSize;
      }
      rawSizes[row] = offset - rawOffsets[row];
    });

    auto combinedMap = std::make_shared<MapVector>(
        context->pool(),
        outputType,
        BufferPtr(nullptr),
        rows.size(),
        offsets,
        sizes,
        combinedKeys,
        combinedValues);

    MapVector::canonicalize(combinedMap, true);

    combinedKeys = combinedMap->mapKeys();
    combinedValues = combinedMap->mapValues();

    // Check for duplicate keys
    SelectivityVector uniqueKeys(offset);
    vector_size_t duplicateCnt = 0;
    rows.applyToSelected([&](vector_size_t row) {
      auto mapOffset = rawOffsets[row];
      auto mapSize = rawSizes[row];
      if (duplicateCnt) {
        rawOffsets[row] -= duplicateCnt;
      }
      for (vector_size_t i = 1; i < mapSize; i++) {
        if (combinedKeys->equalValueAt(
                combinedKeys.get(), mapOffset + i, mapOffset + i - 1)) {
          duplicateCnt++;
          // "remove" duplicate entry
          uniqueKeys.setValid(mapOffset + i - 1, false);
          rawSizes[row]--;
        }
      }
    });

    if (duplicateCnt) {
      uniqueKeys.updateBounds();
      auto uniqueCount = uniqueKeys.countSelected();

      BufferPtr uniqueIndices = allocateIndices(uniqueCount, context->pool());
      auto rawUniqueIndices = uniqueIndices->asMutable<vector_size_t>();
      vector_size_t index = 0;
      uniqueKeys.applyToSelected(
          [&](vector_size_t row) { rawUniqueIndices[index++] = row; });

      auto keys = BaseVector::transpose(uniqueIndices, std::move(combinedKeys));
      auto values =
          BaseVector::transpose(uniqueIndices, std::move(combinedValues));

      combinedMap = std::make_shared<MapVector>(
          context->pool(),
          outputType,
          BufferPtr(nullptr),
          rows.size(),
          offsets,
          sizes,
          keys,
          values);
    }

    context->moveOrCopyResult(combinedMap, rows, result);
  }