public void bindRuntime()

in streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java [216:255]


  public void bindRuntime() throws SpRuntimeException {
    try {
      prepareRuntime();

      Thread thread = new Thread(this);
      thread.start();

      // TODO find a better solution
      // The loop waits until the job is deployed
      // When the deployment takes longer then 60 seconds it returns false
      // This check is not needed when the execution environment is st to local
      if (!config.isMiniClusterMode()) {
        boolean isDeployed = false;
        int count = 0;
        do {
          try {
            count++;
            Thread.sleep(1000);
            Optional<JobStatusMessage> statusMessageOpt =
                getJobStatus(runtimeParameters.getModel().getElementId());
            if (statusMessageOpt.isPresent()
                && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
              isDeployed = true;
            }

          } catch (Exception e) {

          }
        } while (!isDeployed && count < 60);

        if (count == 60) {
          throw new SpRuntimeException("Error: Timeout reached when trying to connect to Flink Job Controller");
        }
      }

    } catch (Exception e) {
      e.printStackTrace();
      throw new SpRuntimeException(e.getMessage());
    }
  }