in streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java [216:255]
public void bindRuntime() throws SpRuntimeException {
try {
prepareRuntime();
Thread thread = new Thread(this);
thread.start();
// TODO find a better solution
// The loop waits until the job is deployed
// When the deployment takes longer then 60 seconds it returns false
// This check is not needed when the execution environment is st to local
if (!config.isMiniClusterMode()) {
boolean isDeployed = false;
int count = 0;
do {
try {
count++;
Thread.sleep(1000);
Optional<JobStatusMessage> statusMessageOpt =
getJobStatus(runtimeParameters.getModel().getElementId());
if (statusMessageOpt.isPresent()
&& statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
isDeployed = true;
}
} catch (Exception e) {
}
} while (!isDeployed && count < 60);
if (count == 60) {
throw new SpRuntimeException("Error: Timeout reached when trying to connect to Flink Job Controller");
}
}
} catch (Exception e) {
e.printStackTrace();
throw new SpRuntimeException(e.getMessage());
}
}