in core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java [342:538]
protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
return new ToolAbstractAsyncExecScript(props) {
private int maxConsecutiveSshFailures = 3;
private Duration maxDelayBetweenPolls = Duration.seconds(20);
private Duration pollTimeout = getOptionalVal(props, PROP_EXEC_ASYNC_POLLING_TIMEOUT, Duration.FIVE_MINUTES);
private int iteration = 0;
private int consecutiveSshFailures = 0;
private int stdoutCount = 0;
private int stderrCount = 0;
private Stopwatch timer;
@Override
public int run() {
timer = Stopwatch.createStarted();
final String scriptContents = toScript(props, commands, env);
if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
// Upload script; try repeatedly because have seen timeout intermittently on vcloud-director (BROOKLYN-106 related).
boolean uploadSuccess = Repeater.create("async script upload on "+SshjTool.this.toString()+" (for "+getSummary()+")")
.backoffTo(maxDelayBetweenPolls)
.limitIterationsTo(3)
.rethrowException()
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
iteration++;
if (LOG.isDebugEnabled()) {
String msg = "Uploading (iteration="+iteration+") for async script on "+SshjTool.this.toString()+" (for "+getSummary()+")";
if (iteration == 1) {
LOG.trace(msg);
} else {
LOG.debug(msg);
}
}
copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
return true;
}})
.run();
if (!uploadSuccess) {
// Unexpected! Should have either returned true or have rethrown the exception; should never get false.
String msg = "Unexpected state: repeated failure for async script upload on "+SshjTool.this.toString()+" ("+getSummary()+")";
LOG.warn(msg+"; rethrowing");
throw new IllegalStateException(msg);
}
// Execute script asynchronously
int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), inCallback, out, err, execTimeout)), -1);
if (execResult != 0) return execResult;
// Long polling to get the status
try {
final AtomicReference<Integer> result = new AtomicReference<Integer>();
boolean success = Repeater.create("async script long-poll on "+SshjTool.this.toString()+" (for "+getSummary()+")")
.backoffTo(maxDelayBetweenPolls)
.limitTimeTo(execTimeout)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
iteration++;
if (LOG.isDebugEnabled()) LOG.debug("Doing long-poll (iteration="+iteration+") for async script to complete on "+SshjTool.this.toString()+" (for "+getSummary()+")");
Integer exitstatus = longPoll();
result.set(exitstatus);
return exitstatus != null;
}})
.run();
if (!success) {
// Timed out
String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
LOG.warn(msg+"; rethrowing");
throw new TimeoutException(msg);
}
return result.get();
} catch (Exception e) {
LOG.debug("Problem polling for async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); rethrowing after deleting temporary files", e);
throw Exceptions.propagate(e);
} finally {
// Delete the temporary files created (and the `tail -c` commands that might have been left behind by long-polls).
// Using pollTimeout so doesn't wait forever, but waits for a reasonable (configurable) length of time.
// TODO also execute this if the `buildRunScriptCommand` fails, as that might have left files behind?
try {
int execDeleteResult = asInt(acquire(new ShellAction(deleteTemporaryFilesCommand(), inCallback, out, err, pollTimeout)), -1);
if (execDeleteResult != 0) {
LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"): exit status "+execDeleteResult);
}
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); continuing", e);
}
}
}
Integer longPoll() throws IOException {
// Long-polling to get stdout, stderr + exit status of async task.
// If our long-poll disconnects, we will just re-execute.
// We wrap the stdout/stderr so that we can get the size count.
// If we disconnect, we will pick up from that char of the stream.
// TODO Additional stdout/stderr written by buildLongPollCommand() could interfere,
// causing us to miss some characters.
Duration nextPollTimeout = Duration.min(pollTimeout, Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS)));
CountingOutputStream countingOut = (out == null) ? null : new CountingOutputStream(out);
CountingOutputStream countingErr = (err == null) ? null : new CountingOutputStream(err);
List<String> pollCommand = buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout);
Duration sshJoinTimeout = nextPollTimeout.add(Duration.TEN_SECONDS);
ShellAction action = new ShellAction(pollCommand, countingOut, countingErr, sshJoinTimeout);
int longPollResult;
try {
longPollResult = asInt(acquire(action, 3, nextPollTimeout), -1);
} catch (RuntimeTimeoutException e) {
if (LOG.isDebugEnabled()) LOG.debug("Long-poll timed out on "+SshjTool.this.toString()+" (for "+getSummary()+"): "+e);
return null;
}
stdoutCount += (countingOut == null) ? 0 : countingOut.getCount();
stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
if (longPollResult == 0) {
if (LOG.isDebugEnabled()) LOG.debug("Long-poll succeeded (exit status 0) on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return longPollResult; // success
} else if (longPollResult == -1) {
// probably a connection failure; try again
if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return null;
} else if (longPollResult == 125) {
// 125 is the special code for timeout in long-poll (see buildLongPollCommand).
// However, there is a tiny chance that the underlying command might have returned that exact exit code!
// Don't treat a timeout as a "consecutiveSshFailure".
if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; most likely timeout; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return retrieveStatusCommand();
} else {
// want to double-check whether this is the exit-code from the async process, or
// some unexpected failure in our long-poll command.
if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
Integer result = retrieveStatusCommand();
if (result != null) {
return result;
}
}
consecutiveSshFailures++;
if (consecutiveSshFailures > maxConsecutiveSshFailures) {
LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors (return -1) when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
return -1;
} else {
LOG.info("Retrying after ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
return null;
}
}
Integer retrieveStatusCommand() throws IOException {
// want to double-check whether this is the exit-code from the async process, or
// some unexpected failure in our long-poll command.
ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr, execTimeout)), -1);
if (statusResult == 0) {
// The status we retrieved really is valid; return it.
// TODO How to ensure no additional output in stdout/stderr when parsing below?
String statusOutStr = new String(statusOut.toByteArray()).trim();
if (Strings.isEmpty(statusOutStr)) {
// suggests not yet completed; will retry with long-poll
if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; command successful but no result available on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return null;
} else {
if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; returning '"+statusOutStr+"' on "+SshjTool.this.toString()+" (for "+getSummary()+")");
int result = Integer.parseInt(statusOutStr);
return result;
}
} else if (statusResult == -1) {
// probably a connection failure; try again with long-poll
if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status directly received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return null;
} else {
if (out != null) {
out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
out.write(statusOut.toByteArray());
}
if (err != null) {
err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
err.write(statusErr.toByteArray());
}
if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status failed; returning "+statusResult+" on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return statusResult;
}
}
}.run();
}