in twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java [848:886]
private boolean handleRestartRunnablesInstances(final Message message, final Runnable completion) {
LOG.debug("Check if it should process a restart runnable instances.");
if (message.getType() != Message.Type.SYSTEM) {
return false;
}
Message.Scope messageScope = message.getScope();
if (messageScope != Message.Scope.RUNNABLE && messageScope != Message.Scope.RUNNABLES) {
return false;
}
Command requestCommand = message.getCommand();
if (!Constants.RESTART_ALL_RUNNABLE_INSTANCES.equals(requestCommand.getCommand()) &&
!Constants.RESTART_RUNNABLES_INSTANCES.equals(requestCommand.getCommand())) {
return false;
}
LOG.debug("Processing restart runnable instances message {}.", message);
if (!Strings.isNullOrEmpty(message.getRunnableName()) && message.getScope() == Message.Scope.RUNNABLE) {
// ... for a runnable ...
String runnableName = message.getRunnableName();
LOG.debug("Start restarting all runnable {} instances.", runnableName);
restartRunnableInstances(runnableName, null, completion);
} else {
// ... or maybe some runnables
for (Map.Entry<String, String> option : requestCommand.getOptions().entrySet()) {
String runnableName = option.getKey();
Set<Integer> restartedInstanceIds = GSON.fromJson(option.getValue(),
new TypeToken<Set<Integer>>() {}.getType());
LOG.debug("Start restarting runnable {} instances {}", runnableName, restartedInstanceIds);
restartRunnableInstances(runnableName, restartedInstanceIds, completion);
}
}
return true;
}