in zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java [158:281]
public void start(String userName) throws IOException {
docker = DefaultDockerClient.builder().uri(URI.create(dockerHost)).build();
removeExistContainer(containerName);
final Map<String, List<PortBinding>> portBindings = new HashMap<>();
// Bind container ports to host ports
int intpServicePort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
this.dockerIntpServicePort = String.valueOf(intpServicePort);
final String[] ports = {dockerIntpServicePort};
for (String port : ports) {
List<PortBinding> hostPorts = new ArrayList<>();
hostPorts.add(PortBinding.of("0.0.0.0", port));
portBindings.put(port, hostPorts);
}
final HostConfig hostConfig = HostConfig.builder()
.networkMode("host").portBindings(portBindings).build();
DockerSpecTemplate specTemplate = new DockerSpecTemplate();
specTemplate.loadProperties(getTemplateBindings());
URL urlTemplate = this.getClass().getResource(DOCKER_INTP_JINJA);
String template = Resources.toString(urlTemplate, StandardCharsets.UTF_8);
String dockerCommand = specTemplate.render(template);
int firstLineIsNewline = dockerCommand.indexOf("\n");
if (firstLineIsNewline == 0) {
dockerCommand = dockerCommand.replaceFirst("\n", "");
}
LOGGER.info("dockerCommand = {}", dockerCommand);
List<String> listEnv = getListEnvs();
LOGGER.info("docker listEnv = {}", listEnv);
// check if the interpreter process exit script
// if interpreter process exit, then container need exit
StringBuilder sbStartCmd = new StringBuilder();
sbStartCmd.append("sleep 20; ");
sbStartCmd.append("process=RemoteInterpreterServer; ");
sbStartCmd.append("RUNNING_PIDS=$(ps x | grep $process | grep -v grep | awk '{print $1}'); ");
sbStartCmd.append("while [ ! -z \"$RUNNING_PIDS\" ]; ");
sbStartCmd.append("do sleep 1; ");
sbStartCmd.append("RUNNING_PIDS=$(ps x | grep $process | grep -v grep | awk '{print $1}'); ");
sbStartCmd.append("done");
// Create container with exposed ports
final ContainerConfig containerConfig = ContainerConfig.builder()
.hostConfig(hostConfig)
.hostname(this.intpEventServerHost)
.image(containerImage)
.workingDir("/")
.env(listEnv)
.cmd("sh", "-c", sbStartCmd.toString())
.build();
try {
LOGGER.info("wait docker pull image {} ...", containerImage);
docker.pull(containerImage, new ProgressHandler() {
@Override
public void progress(ProgressMessage message) throws DockerException {
if (null != message.error()) {
LOGGER.error(message.toString());
}
}
});
final ContainerCreation containerCreation
= docker.createContainer(containerConfig, containerName);
String containerId = containerCreation.id();
// Start container
docker.startContainer(containerId);
copyRunFileToContainer(containerId);
execInContainer(containerId, dockerCommand, false);
} catch (DockerException e) {
throw new IOException(e);
} catch (InterruptedException e) {
// Restore interrupted state...
Thread.currentThread().interrupt();
throw new IOException("Docker preparations were interrupted.", e);
}
long startTime = System.currentTimeMillis();
long timeoutTime = startTime + getConnectTimeout();
// wait until interpreter send dockerStarted message through thrift rpc
synchronized (dockerStarted) {
LOGGER.info("Waiting for interpreter container to be ready");
while (!dockerStarted.get() && !Thread.currentThread().isInterrupted()) {
long timeToTimeout = timeoutTime - System.currentTimeMillis();
if (timeToTimeout <= 0) {
LOGGER.info("Interpreter docker creation is time out in {} seconds",
getConnectTimeout() / 1000);
stop();
throw new IOException(
"Launching zeppelin interpreter on docker is time out, kill it now");
}
try {
dockerStarted.wait(timeToTimeout);
} catch (InterruptedException e) {
// Restore interrupted state...
Thread.currentThread().interrupt();
stop();
throw new IOException("Remote interpreter is not accessible", e);
}
}
}
// waits for interpreter thrift rpc server ready
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
break;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
}
}