in fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java [300:388]
CompletableFuture<Void> stopServices() {
synchronized (lock) {
Throwable exception = null;
try {
if (tabletServerMetricGroup != null) {
tabletServerMetricGroup.close();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(2);
try {
if (metricRegistry != null) {
terminationFutures.add(metricRegistry.closeAsync());
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
try {
if (rpcServer != null) {
terminationFutures.add(rpcServer.closeAsync());
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
try {
if (tabletService != null) {
tabletService.shutdown();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
try {
if (zkClient != null) {
zkClient.close();
}
// TODO currently, rpc client don't have timeout logic. After implementing the
// timeout logic, we need to move the closure of rpc client to after the closure of
// replica manager.
if (rpcClient != null) {
rpcClient.close();
}
if (clientMetricGroup != null) {
clientMetricGroup.close();
}
// We must shut down the scheduler early because otherwise, the scheduler could
// touch other resources that might have been shutdown and cause exceptions.
if (scheduler != null) {
scheduler.shutdown();
}
if (kvManager != null) {
kvManager.shutdown();
}
if (remoteLogManager != null) {
remoteLogManager.close();
}
if (logManager != null) {
logManager.shutdown();
}
if (replicaManager != null) {
replicaManager.shutdown();
}
if (authorizer != null) {
authorizer.close();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
return FutureUtils.completeAll(terminationFutures);
}
}