in rsc/src/main/java/org/apache/livy/rsc/RSCClient.java [213:258]
public synchronized void stop(boolean shutdownContext) {
if (isAlive) {
isAlive = false;
try {
this.contextInfoPromise.cancel(true);
if (shutdownContext && driverRpc.isSuccess()) {
protocol.endSession();
// Because the remote context won't really reply to the end session message -
// since it closes the channel while handling it, we wait for the RPC's channel
// to close instead.
long stopTimeout = conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT);
driverRpc.get().getChannel().closeFuture().get(stopTimeout,
TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
LOG.warn("Exception while waiting for end session reply.", e);
Utils.propagate(e);
} finally {
if (driverRpc.isSuccess()) {
try {
driverRpc.get().close();
} catch (Exception e) {
LOG.warn("Error stopping RPC.", e);
}
}
// Report failure for all pending jobs, so that clients can react.
for (Map.Entry<String, JobHandleImpl<?>> e : jobs.entrySet()) {
LOG.info("Failing pending job {} due to shutdown.", e.getKey());
try {
e.getValue().setFailure(new IOException("RSCClient instance stopped."));
} catch (Exception e2) {
LOG.info("Job " + e.getKey() + " already failed.", e2);
}
}
eventLoopGroup.shutdownGracefully();
}
if (contextInfo != null) {
LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
shutdownContext);
}
}
}