in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java [124:206]
public void processElement(StreamRecord<Event> streamRecord) throws Exception {
Event event = streamRecord.getValue();
if (event instanceof FlushEvent) {
for (int i = 0; i < totalTasksNumber; i++) {
output.collect(
new StreamRecord<>(
new BucketWrapperFlushEvent(
i,
((FlushEvent) event).getSourceSubTaskId(),
currentTaskNumber,
((FlushEvent) event).getTableIds(),
((FlushEvent) event).getSchemaChangeEventType())));
}
return;
}
if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
if (!schemaMaps.containsKey(dataChangeEvent.tableId())) {
Optional<Schema> schema =
schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId());
if (schema.isPresent()) {
schemaMaps.put(
dataChangeEvent.tableId(), new TableSchemaInfo(schema.get(), zoneId));
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for "
+ dataChangeEvent.tableId());
}
}
Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, RowPartitionKeyExtractor> tuple4 =
bucketAssignerMap.computeIfAbsent(
dataChangeEvent.tableId(), this::getTableInfo);
int bucket;
GenericRow genericRow =
PaimonWriterHelper.convertEventToGenericRow(
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
switch (tuple4.f0) {
case HASH_DYNAMIC:
{
bucket =
tuple4.f2.assign(
tuple4.f3.partition(genericRow),
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
break;
}
case HASH_FIXED:
{
tuple4.f1.setRecord(genericRow);
bucket = tuple4.f1.bucket();
break;
}
case BUCKET_UNAWARE:
{
bucket = 0;
break;
}
case CROSS_PARTITION:
default:
{
throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);
}
}
output.collect(
new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event)));
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
Schema schema =
SchemaUtils.applySchemaChangeEvent(
Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
.map(TableSchemaInfo::getSchema)
.orElse(null),
schemaChangeEvent);
schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId));
// Broadcast SchemachangeEvent.
for (int index = 0; index < totalTasksNumber; index++) {
output.collect(
new StreamRecord<>(
new BucketWrapperChangeEvent(index, (ChangeEvent) event)));
}
}
}