public void process()

in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java [460:516]


    public void process(CoordinatorEvent event) {
        try {
            if (event instanceof CreateTableEvent) {
                processCreateTable((CreateTableEvent) event);
            } else if (event instanceof CreatePartitionEvent) {
                processCreatePartition((CreatePartitionEvent) event);
            } else if (event instanceof DropTableEvent) {
                processDropTable((DropTableEvent) event);
            } else if (event instanceof DropPartitionEvent) {
                processDropPartition((DropPartitionEvent) event);
            } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
                processNotifyLeaderAndIsrResponseReceivedEvent(
                        (NotifyLeaderAndIsrResponseReceivedEvent) event);
            } else if (event instanceof DeleteReplicaResponseReceivedEvent) {
                processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event);
            } else if (event instanceof NewTabletServerEvent) {
                processNewTabletServer((NewTabletServerEvent) event);
            } else if (event instanceof DeadTabletServerEvent) {
                processDeadTabletServer((DeadTabletServerEvent) event);
            } else if (event instanceof AdjustIsrReceivedEvent) {
                AdjustIsrReceivedEvent adjustIsrReceivedEvent = (AdjustIsrReceivedEvent) event;
                CompletableFuture<AdjustIsrResponse> callback =
                        adjustIsrReceivedEvent.getRespCallback();
                completeFromCallable(
                        callback,
                        () ->
                                makeAdjustIsrResponse(
                                        tryProcessAdjustIsr(
                                                adjustIsrReceivedEvent.getLeaderAndIsrMap())));
            } else if (event instanceof CommitKvSnapshotEvent) {
                CommitKvSnapshotEvent commitKvSnapshotEvent = (CommitKvSnapshotEvent) event;
                CompletableFuture<CommitKvSnapshotResponse> callback =
                        commitKvSnapshotEvent.getRespCallback();
                completeFromCallable(
                        callback, () -> tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
            } else if (event instanceof CommitRemoteLogManifestEvent) {
                CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
                        (CommitRemoteLogManifestEvent) event;
                completeFromCallable(
                        commitRemoteLogManifestEvent.getRespCallback(),
                        () -> tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
            } else if (event instanceof CommitLakeTableSnapshotEvent) {
                CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
                        (CommitLakeTableSnapshotEvent) event;
                completeFromCallable(
                        commitLakeTableSnapshotEvent.getRespCallback(),
                        () -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
            } else if (event instanceof AccessContextEvent) {
                AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
                processAccessContext(accessContextEvent);
            } else {
                LOG.warn("Unknown event type: {}", event.getClass().getName());
            }
        } finally {
            updateMetrics();
        }
    }