in systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java [206:320]
protected void runBroker(final Class testClass,
final Method method,
final String readyLogPattern,
final String stopLogPattern,
final String portListeningLogPattern,
final String processPIDLogPattern, String currentWorkDirectory) throws IOException
{
LOGGER.debug("Spawning broker working folder: {}", currentWorkDirectory);
int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);
ProcessBuilder processBuilder = createBrokerProcessBuilder(currentWorkDirectory, testClass);
processBuilder.redirectErrorStream(true);
Map<String, String> processEnvironment = processBuilder.environment();
processEnvironment.put("QPID_PNAME", String.format("-DPNAME=QPBRKR -DTNAME=\"%s\"", testClass.getName()));
CountDownLatch readyLatch = new CountDownLatch(1);
long startTime = System.currentTimeMillis();
LOGGER.debug("Starting broker process");
_process = processBuilder.start();
BrokerSystemOutputHandler brokerSystemOutputHandler = new BrokerSystemOutputHandler(_process.getInputStream(),
getLogConsumer(),
readyLogPattern,
stopLogPattern,
processPIDLogPattern,
portListeningLogPattern,
readyLatch,
getClass().getName()
);
boolean brokerStarted = false;
_executorService = Executors.newFixedThreadPool(1, new ThreadFactory()
{
@Override
public Thread newThread(final Runnable r)
{
Thread t = new Thread(r, BrokerSystemOutputHandler.class.getSimpleName());
t.setDaemon(false);
return t;
}
});
try
{
_executorService.submit(brokerSystemOutputHandler);
if (!readyLatch.await(startUpTime, TimeUnit.MILLISECONDS))
{
LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
startUpTime, readyLogPattern);
throw new BrokerAdminException(String.format(
"Broker failed to become ready within %d ms. Stop line : %s",
startUpTime,
readyLogPattern));
}
_pid = brokerSystemOutputHandler.getPID();
_ports = brokerSystemOutputHandler.getAmqpPorts();
if (_pid == -1)
{
throw new BrokerAdminException("Broker PID is not detected");
}
if (_ports.size() == 0)
{
throw new BrokerAdminException("Broker port is not detected");
}
try
{
//test that the broker is still running and hasn't exited unexpectedly
int exit = _process.exitValue();
LOGGER.info("broker aborted: {}", exit);
throw new BrokerAdminException("broker aborted: " + exit);
}
catch (IllegalThreadStateException e)
{
// this is expect if the broker started successfully
}
LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
System.currentTimeMillis() - startTime,
_pid);
LOGGER.info("Broker ports: {}", _ports);
brokerStarted = true;
}
catch (RuntimeException e)
{
throw e;
}
catch (InterruptedException e)
{
Thread.interrupted();
}
catch (Exception e)
{
throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
}
finally
{
if (!brokerStarted)
{
LOGGER.warn("Broker failed to start");
_process.destroy();
_process = null;
_executorService.shutdown();
_executorService = null;
_ports = null;
_pid = null;
}
}
}