in tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java [86:143]
protected void doStart() {
leaderElection = new LeaderElection(zkClient, "/tx.service/leader", new ElectionHandler() {
@Override
public void leader() {
// if the txManager fails, we should stop the server
txManager = txManagerProvider.get();
txManager.addListener(new ServiceListenerAdapter() {
@Override
public void failed(State from, Throwable failure) {
LOG.error("Transaction manager aborted, stopping transaction service");
TransactionService.this.abort(failure);
}
}, MoreExecutors.sameThreadExecutor());
pruningService = createPruningService(conf, txManager);
server = ThriftRPCServer.builder(TTransactionServer.class)
.setHost(address)
.setPort(port)
.setWorkerThreads(threads)
.setMaxReadBufferBytes(maxReadBufferBytes)
.setIOThreads(ioThreads)
.build(new TransactionServiceThriftHandler(txManager, pruningService));
try {
server.startAndWait();
pruningService.startAndWait();
doRegister();
LOG.info("Transaction Thrift Service started successfully on " + getAddress());
} catch (Throwable t) {
LOG.info("Transaction Thrift Service didn't start on " + server.getBindAddress());
leaderElection.stop();
notifyFailed(t);
}
}
@Override
public void follower() {
ListenableFuture<State> stopFuture = null;
if (pruningService != null && pruningService.isRunning()) {
// Wait for pruning service to stop after un-registering from discovery
stopFuture = pruningService.stop();
}
// First stop the transaction server as un-registering from discovery can block sometimes.
// That can lead to multiple transaction servers being active at the same time.
if (server != null && server.isRunning()) {
server.stopAndWait();
}
undoRegister();
if (stopFuture != null) {
Futures.getUnchecked(stopFuture);
}
}
});
leaderElection.start();
notifyStarted();
}