protected int execScriptAsyncAndPoll()

in brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java [355:550]


    protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
        return new ToolAbstractAsyncExecScript(props) {
            private int maxConsecutiveSshFailures = 3;
            private Duration maxDelayBetweenPolls = Duration.seconds(20);
            private Duration pollTimeout = getOptionalVal(props, PROP_EXEC_ASYNC_POLLING_TIMEOUT, Duration.FIVE_MINUTES);
            private int iteration = 0;
            private int consecutiveSshFailures = 0;
            private int stdoutCount = 0;
            private int stderrCount = 0;
            private Stopwatch timer;
            
            public int run() {
                timer = Stopwatch.createStarted();
                final String scriptContents = toScript(props, commands, env);
                if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
                
                // Upload script; try repeatedly because have seen timeout intermittently on vcloud-director (BROOKLYN-106 related).
                boolean uploadSuccess = Repeater.create("async script upload on "+SshjTool.this.toString()+" (for "+getSummary()+")")
                        .backoffTo(maxDelayBetweenPolls)
                        .limitIterationsTo(3)
                        .rethrowException()
                        .until(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {
                                iteration++;
                                if (LOG.isDebugEnabled()) {
                                    String msg = "Uploading (iteration="+iteration+") for async script on "+SshjTool.this.toString()+" (for "+getSummary()+")";
                                    if (iteration == 1) {
                                        LOG.trace(msg);
                                    } else {
                                        LOG.debug(msg);
                                    }
                                }
                                copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
                                return true;
                            }})
                        .run();
                
                if (!uploadSuccess) {
                    // Unexpected! Should have either returned true or have rethrown the exception; should never get false.
                    String msg = "Unexpected state: repeated failure for async script upload on "+SshjTool.this.toString()+" ("+getSummary()+")";
                    LOG.warn(msg+"; rethrowing");
                    throw new IllegalStateException(msg);
                }
                
                // Execute script asynchronously
                int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
                if (execResult != 0) return execResult;

                // Long polling to get the status
                try {
                    final AtomicReference<Integer> result = new AtomicReference<Integer>();
                    boolean success = Repeater.create("async script long-poll on "+SshjTool.this.toString()+" (for "+getSummary()+")")
                            .backoffTo(maxDelayBetweenPolls)
                            .limitTimeTo(execTimeout)
                            .until(new Callable<Boolean>() {
                                @Override
                                public Boolean call() throws Exception {
                                    iteration++;
                                    if (LOG.isDebugEnabled()) LOG.debug("Doing long-poll (iteration="+iteration+") for async script to complete on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                                    Integer exitstatus = longPoll();
                                    result.set(exitstatus);
                                    return exitstatus != null;
                                }})
                            .run();
                    
                    if (!success) {
                        // Timed out
                        String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
                        LOG.warn(msg+"; rethrowing");
                        throw new TimeoutException(msg);
                    }
                    
                    return result.get();
                    
                } catch (Exception e) {
                    LOG.debug("Problem polling for async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); rethrowing after deleting temporary files", e);
                    throw Exceptions.propagate(e);
                } finally {
                    // Delete the temporary files created (and the `tail -c` commands that might have been left behind by long-polls).
                    // Using pollTimeout so doesn't wait forever, but waits for a reasonable (configurable) length of time.
                    // TODO also execute this if the `buildRunScriptCommand` fails, as that might have left files behind?
                    try {
                        int execDeleteResult = asInt(acquire(new ShellAction(deleteTemporaryFilesCommand(), out, err, pollTimeout)), -1);
                        if (execDeleteResult != 0) {
                            LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"): exit status "+execDeleteResult);
                        }
                    } catch (Exception e) {
                        Exceptions.propagateIfFatal(e);
                        LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); continuing", e);
                    }
                }
            }
            
            Integer longPoll() throws IOException {
                // Long-polling to get stdout, stderr + exit status of async task.
                // If our long-poll disconnects, we will just re-execute.
                // We wrap the stdout/stderr so that we can get the size count. 
                // If we disconnect, we will pick up from that char of the stream.
                // TODO Additional stdout/stderr written by buildLongPollCommand() could interfere, 
                //      causing us to miss some characters.
                Duration nextPollTimeout = Duration.min(pollTimeout, Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS)));
                CountingOutputStream countingOut = (out == null) ? null : new CountingOutputStream(out);
                CountingOutputStream countingErr = (err == null) ? null : new CountingOutputStream(err);
                List<String> pollCommand = buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout);
                Duration sshJoinTimeout = nextPollTimeout.add(Duration.TEN_SECONDS);
                ShellAction action = new ShellAction(pollCommand, countingOut, countingErr, sshJoinTimeout);
                
                int longPollResult;
                try {
                    longPollResult = asInt(acquire(action, 3, nextPollTimeout), -1);
                } catch (RuntimeTimeoutException e) {
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll timed out on "+SshjTool.this.toString()+" (for "+getSummary()+"): "+e);
                    return null;
                }
                stdoutCount += (countingOut == null) ? 0 : countingOut.getCount();
                stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
                
                if (longPollResult == 0) {
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll succeeded (exit status 0) on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                    return longPollResult; // success
                    
                } else if (longPollResult == -1) {
                    // probably a connection failure; try again
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                    return null;

                } else if (longPollResult == 125) {
                    // 125 is the special code for timeout in long-poll (see buildLongPollCommand).
                    // However, there is a tiny chance that the underlying command might have returned that exact exit code!
                    // Don't treat a timeout as a "consecutiveSshFailure".
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; most likely timeout; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                    return retrieveStatusCommand();

                } else {
                    // want to double-check whether this is the exit-code from the async process, or
                    // some unexpected failure in our long-poll command.
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                    Integer result = retrieveStatusCommand();
                    if (result != null) {
                        return result;
                    }
                }
                    
                consecutiveSshFailures++;
                if (consecutiveSshFailures > maxConsecutiveSshFailures) {
                    LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors (return -1) when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
                    return -1;
                } else {
                    LOG.info("Retrying after ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
                    return null;
                }
            }
            
            Integer retrieveStatusCommand() throws IOException {
                // want to double-check whether this is the exit-code from the async process, or
                // some unexpected failure in our long-poll command.
                ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
                ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
                int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr, execTimeout)), -1);
                
                if (statusResult == 0) {
                    // The status we retrieved really is valid; return it.
                    // TODO How to ensure no additional output in stdout/stderr when parsing below?
                    String statusOutStr = new String(statusOut.toByteArray()).trim();
                    if (Strings.isEmpty(statusOutStr)) {
                        // suggests not yet completed; will retry with long-poll
                        if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; command successful but no result available on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                        return null;
                    } else {
                        if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; returning '"+statusOutStr+"' on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                        int result = Integer.parseInt(statusOutStr);
                        return result;
                    }

                } else if (statusResult == -1) {
                    // probably a connection failure; try again with long-poll
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status directly received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                    return null;
                    
                } else {
                    if (out != null) {
                        out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
                        out.write(statusOut.toByteArray());
                    }
                    if (err != null) {
                        err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
                        err.write(statusErr.toByteArray());
                    }
                    
                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status failed; returning "+statusResult+" on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                    return statusResult;
                }
            }
        }.run();
    }