in src/main/java/org/apache/paimon/trino/TrinoMergeSink.java [47:106]
public void storeMergedRows(Page page) {
int inputChannelCount = page.getChannelCount();
if (inputChannelCount != dataColumnCount + 2) {
throw new IllegalArgumentException(
String.format(
"inputPage channelCount (%s) == dataColumns size (%s) + 2",
inputChannelCount, dataColumnCount));
} else {
int positionCount = page.getPositionCount();
if (positionCount <= 0) {
throw new IllegalArgumentException(
"positionCount should be > 0, but is " + positionCount);
} else {
Block operationBlock = page.getBlock(inputChannelCount - 2);
int[] deletePositions = new int[positionCount];
int[] insertPositions = new int[positionCount];
int deletePositionCount = 0;
int insertPositionCount = 0;
for (int position = 0; position < positionCount; ++position) {
byte operation = TinyintType.TINYINT.getByte(operationBlock, position);
switch (operation) {
case 1:
case 4:
insertPositions[insertPositionCount] = position;
++insertPositionCount;
break;
case 2:
case 5:
deletePositions[deletePositionCount] = position;
++deletePositionCount;
break;
case 3:
default:
throw new IllegalArgumentException(
"Invalid merge operation: " + operation);
}
}
Optional<Page> deletePage = Optional.empty();
if (deletePositionCount > 0) {
deletePage =
Optional.of(
page.getColumns(IntStream.range(0, dataColumnCount).toArray())
.getPositions(deletePositions, 0, deletePositionCount));
}
Optional<Page> insertPage = Optional.empty();
if (insertPositionCount > 0) {
insertPage =
Optional.of(
page.getColumns(IntStream.range(0, dataColumnCount).toArray())
.getPositions(insertPositions, 0, insertPositionCount));
}
deletePage.ifPresent(delete -> pageSink.writePage(delete, RowKind.DELETE));
insertPage.ifPresent(insert -> pageSink.writePage(insert, RowKind.INSERT));
}
}
}