public void processElement()

in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java [124:206]


    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = streamRecord.getValue();
        if (event instanceof FlushEvent) {
            for (int i = 0; i < totalTasksNumber; i++) {
                output.collect(
                        new StreamRecord<>(
                                new BucketWrapperFlushEvent(
                                        i,
                                        ((FlushEvent) event).getSourceSubTaskId(),
                                        currentTaskNumber,
                                        ((FlushEvent) event).getTableIds(),
                                        ((FlushEvent) event).getSchemaChangeEventType())));
            }
            return;
        }

        if (event instanceof DataChangeEvent) {
            DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
            if (!schemaMaps.containsKey(dataChangeEvent.tableId())) {
                Optional<Schema> schema =
                        schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId());
                if (schema.isPresent()) {
                    schemaMaps.put(
                            dataChangeEvent.tableId(), new TableSchemaInfo(schema.get(), zoneId));
                } else {
                    throw new RuntimeException(
                            "Could not find schema message from SchemaRegistry for "
                                    + dataChangeEvent.tableId());
                }
            }
            Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, RowPartitionKeyExtractor> tuple4 =
                    bucketAssignerMap.computeIfAbsent(
                            dataChangeEvent.tableId(), this::getTableInfo);
            int bucket;
            GenericRow genericRow =
                    PaimonWriterHelper.convertEventToGenericRow(
                            dataChangeEvent,
                            schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
            switch (tuple4.f0) {
                case HASH_DYNAMIC:
                    {
                        bucket =
                                tuple4.f2.assign(
                                        tuple4.f3.partition(genericRow),
                                        tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
                        break;
                    }
                case HASH_FIXED:
                    {
                        tuple4.f1.setRecord(genericRow);
                        bucket = tuple4.f1.bucket();
                        break;
                    }
                case BUCKET_UNAWARE:
                    {
                        bucket = 0;
                        break;
                    }
                case CROSS_PARTITION:
                default:
                    {
                        throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);
                    }
            }
            output.collect(
                    new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event)));
        } else if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
            Schema schema =
                    SchemaUtils.applySchemaChangeEvent(
                            Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
                                    .map(TableSchemaInfo::getSchema)
                                    .orElse(null),
                            schemaChangeEvent);
            schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId));
            // Broadcast SchemachangeEvent.
            for (int index = 0; index < totalTasksNumber; index++) {
                output.collect(
                        new StreamRecord<>(
                                new BucketWrapperChangeEvent(index, (ChangeEvent) event)));
            }
        }
    }