in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java [460:516]
public void process(CoordinatorEvent event) {
try {
if (event instanceof CreateTableEvent) {
processCreateTable((CreateTableEvent) event);
} else if (event instanceof CreatePartitionEvent) {
processCreatePartition((CreatePartitionEvent) event);
} else if (event instanceof DropTableEvent) {
processDropTable((DropTableEvent) event);
} else if (event instanceof DropPartitionEvent) {
processDropPartition((DropPartitionEvent) event);
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
processNotifyLeaderAndIsrResponseReceivedEvent(
(NotifyLeaderAndIsrResponseReceivedEvent) event);
} else if (event instanceof DeleteReplicaResponseReceivedEvent) {
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event);
} else if (event instanceof NewTabletServerEvent) {
processNewTabletServer((NewTabletServerEvent) event);
} else if (event instanceof DeadTabletServerEvent) {
processDeadTabletServer((DeadTabletServerEvent) event);
} else if (event instanceof AdjustIsrReceivedEvent) {
AdjustIsrReceivedEvent adjustIsrReceivedEvent = (AdjustIsrReceivedEvent) event;
CompletableFuture<AdjustIsrResponse> callback =
adjustIsrReceivedEvent.getRespCallback();
completeFromCallable(
callback,
() ->
makeAdjustIsrResponse(
tryProcessAdjustIsr(
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
} else if (event instanceof CommitKvSnapshotEvent) {
CommitKvSnapshotEvent commitKvSnapshotEvent = (CommitKvSnapshotEvent) event;
CompletableFuture<CommitKvSnapshotResponse> callback =
commitKvSnapshotEvent.getRespCallback();
completeFromCallable(
callback, () -> tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
} else if (event instanceof CommitRemoteLogManifestEvent) {
CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
(CommitRemoteLogManifestEvent) event;
completeFromCallable(
commitRemoteLogManifestEvent.getRespCallback(),
() -> tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
} else if (event instanceof CommitLakeTableSnapshotEvent) {
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
(CommitLakeTableSnapshotEvent) event;
completeFromCallable(
commitLakeTableSnapshotEvent.getRespCallback(),
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
} else if (event instanceof AccessContextEvent) {
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
processAccessContext(accessContextEvent);
} else {
LOG.warn("Unknown event type: {}", event.getClass().getName());
}
} finally {
updateMetrics();
}
}