in tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java [404:491]
private void restart() throws IOException, InterruptedException, TimeoutException {
if (process != null) {
LOG.debug("process still alive; trying to destroy it");
destroyForcibly();
boolean processEnded = process.waitFor(30, TimeUnit.SECONDS);
if (! processEnded) {
LOG.warn("pipesClientId={}: process has not yet ended", pipesClientId);
}
executorService.shutdownNow();
boolean shutdown = executorService.awaitTermination(30, TimeUnit.SECONDS);
if (! shutdown) {
LOG.warn("pipesClientId={}: executorService has not yet shutdown", pipesClientId);
}
synchronized (executorServiceLock) {
if (closed) {
throw new IllegalArgumentException("pipesClientId=" + pipesClientId +
": PipesClient closed");
}
executorService = Executors.newFixedThreadPool(1);
}
LOG.info("pipesClientId={}: restarting process", pipesClientId);
} else {
LOG.info("pipesClientId={}: starting process", pipesClientId);
}
ProcessBuilder pb = new ProcessBuilder(getCommandline());
pb.redirectError(ProcessBuilder.Redirect.INHERIT);
try {
process = pb.start();
} catch (Exception e) {
//Do we ever want this to be not fatal?!
LOG.error("failed to start client", e);
throw new FailedToStartClientException(e);
}
input = new DataInputStream(process.getInputStream());
output = new DataOutputStream(process.getOutputStream());
//wait for ready signal
final UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get();
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
int b = input.read();
int read = 1;
while (read < MAX_BYTES_BEFORE_READY && b != READY.getByte()) {
if (b == -1) {
throw new RuntimeException(getMsg("pipesClientId=" + pipesClientId + ": " +
"Couldn't start server -- read EOF before 'ready' byte.\n" +
" process isAlive=" + process.isAlive(), bos));
}
bos.write(b);
b = input.read();
read++;
}
if (read >= MAX_BYTES_BEFORE_READY) {
throw new RuntimeException(getMsg("pipesClientId=" + pipesClientId + ": " +
"Couldn't start server: read too many bytes before 'ready' byte.\n" +
" Make absolutely certain that your logger is not writing to " +
"stdout.\n", bos));
}
if (bos.size() > 0) {
LOG.warn("pipesClientId={}: From forked process before start byte: {}",
pipesClientId, bos.toString(StandardCharsets.UTF_8));
}
return 1;
});
long start = System.currentTimeMillis();
executorService.submit(futureTask);
try {
futureTask.get(pipesConfig.getStartupTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
destroyForcibly();
throw e;
} catch (ExecutionException e) {
LOG.error("pipesClientId=" + pipesClientId + ": couldn't start server", e);
destroyForcibly();
throw new RuntimeException(e);
} catch (TimeoutException e) {
long elapsed = System.currentTimeMillis() - start;
LOG.error("pipesClientId={} didn't receive ready byte from server within " +
"StartupTimeoutMillis {}; ms elapsed {}; did read >{}<",
pipesClientId, pipesConfig.getStartupTimeoutMillis(),
elapsed, bos.toString(StandardCharsets.UTF_8));
destroyForcibly();
throw e;
} finally {
futureTask.cancel(true);
}
}