public void storeMergedRows()

in src/main/java/org/apache/paimon/trino/TrinoMergeSink.java [47:106]


    public void storeMergedRows(Page page) {
        int inputChannelCount = page.getChannelCount();
        if (inputChannelCount != dataColumnCount + 2) {
            throw new IllegalArgumentException(
                    String.format(
                            "inputPage channelCount (%s) == dataColumns size (%s) + 2",
                            inputChannelCount, dataColumnCount));
        } else {
            int positionCount = page.getPositionCount();
            if (positionCount <= 0) {
                throw new IllegalArgumentException(
                        "positionCount should be > 0, but is " + positionCount);
            } else {
                Block operationBlock = page.getBlock(inputChannelCount - 2);
                int[] deletePositions = new int[positionCount];
                int[] insertPositions = new int[positionCount];
                int deletePositionCount = 0;
                int insertPositionCount = 0;

                for (int position = 0; position < positionCount; ++position) {
                    byte operation = TinyintType.TINYINT.getByte(operationBlock, position);
                    switch (operation) {
                        case 1:
                        case 4:
                            insertPositions[insertPositionCount] = position;
                            ++insertPositionCount;
                            break;
                        case 2:
                        case 5:
                            deletePositions[deletePositionCount] = position;
                            ++deletePositionCount;
                            break;
                        case 3:
                        default:
                            throw new IllegalArgumentException(
                                    "Invalid merge operation: " + operation);
                    }
                }

                Optional<Page> deletePage = Optional.empty();
                if (deletePositionCount > 0) {
                    deletePage =
                            Optional.of(
                                    page.getColumns(IntStream.range(0, dataColumnCount).toArray())
                                            .getPositions(deletePositions, 0, deletePositionCount));
                }

                Optional<Page> insertPage = Optional.empty();
                if (insertPositionCount > 0) {
                    insertPage =
                            Optional.of(
                                    page.getColumns(IntStream.range(0, dataColumnCount).toArray())
                                            .getPositions(insertPositions, 0, insertPositionCount));
                }

                deletePage.ifPresent(delete -> pageSink.writePage(delete, RowKind.DELETE));
                insertPage.ifPresent(insert -> pageSink.writePage(insert, RowKind.INSERT));
            }
        }
    }