private int attachToServices()

in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/container/deployer/vinci/VinciCasProcessorDeployer.java [735:918]


  private int attachToServices(boolean redeploy, String aServiceUri, int howMany,
          ProcessingContainer aProcessingContainer) throws Exception {
    VinciServiceInfo serviceInfo = null;

    // Retrieve configuration information for the CasProcessor. This is the configuration from the
    // CPE descriptor
    CasProcessorConfiguration casProcessorConfig = aProcessingContainer
            .getCasProcessorConfiguration();
    ArrayList serviceList = null;
    // Check if this Cas Processor has exclusive service access. Meaning it requires service per
    // proxy.
    boolean exclusiveAccess = false;
    String serviceAccess = casProcessorConfig.getDeploymentParameter("service-access");
    if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_cp_with_exclusive_access__FINEST",
                new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
                    casProcessorConfig.getDeploymentType() });
      }
      exclusiveAccess = true;
      // The following is true if the service just crashed and we are trying to recover. The
      // container in this case
      // should contain a reference to the Cas Processor that failed.
      if (((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList.size() > 0) {
        // In case the service starts on the same port as before it crashed, mark it as
        // available in the current service list
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_retrieve_cp_from_failed_cplist__FINEST",
                  new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
                      casProcessorConfig.getDeploymentType() });
        }
        CasProcessor processor = (CasProcessor) ((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList
                .get(0);
        // Extract old (stale) proxy from the CAS Processor
        VinciTAP tap = ((NetworkCasProcessorImpl) processor).getProxy();
        if (tap != null) {
          // Since the new service may have started on the same machine and the same port, make sure
          // that the
          // current list is updated so that the service can be again connected to.
          String service_host = tap.getServiceHost();
          int service_port = tap.getServicePort();
          // Go through all services in the in-use service list and try to find one that matches
          // host and port
          // If found, change the state of the service to available again.
          for (int i = 0; currentServiceList != null && i < currentServiceList.size(); i++) {
            VinciServiceInfo si = (VinciServiceInfo) currentServiceList.get(i);
            if (si.getHost().equals(service_host) && si.getPort() == service_port) {
              si.setAvailable(true);
            }
          }
        }

      }

      // Retrieve a new service list from the VNS
      serviceList = getNewServiceList(aServiceUri, casProcessorConfig);

      if (serviceList == null || serviceList.size() == 0 && !redeploy) {
        throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_no_service_in_vns__FINEST",
                new Object[] { Thread.currentThread().getName(), aServiceUri,
                    casProcessorConfig.getDeploymentType(),
                    casProcessorConfig.getDeploymentParameter(Constants.VNS_HOST),
                    casProcessorConfig.getDeploymentParameter(Constants.VNS_PORT) });
      }
      // When redeploying/reconnecting, we only interested in one proxy
      if (redeploy) {
        howMany = 1; // override
      }
      // If this is the first time through here, make sure the service list is cached. This is the
      // current service list.
      if (currentServiceList == null) {
        currentServiceList = serviceList;
      } else {
        // Copy the state of each service from current list to new list. Just change the state
        // of service from unassigned to assigned if it happens not to be in use.
        int newServiceCount = VNSQuery.findUnassigned(currentServiceList, serviceList);
        if (newServiceCount > 0) {
          // new Services have been started. Use the new list as current.
          currentServiceList.clear();
          currentServiceList = serviceList;
        }
      }

    }
    int succesfullConnections = 0;

    int rC = casProcessorConfig.getMaxRestartCount();

    int maxTimeToWait = casProcessorConfig.getMaxTimeToWaitBetweenRetries();
    // Never sleep indefinitely. Override if the maxTimeToWait = 0
    if (maxTimeToWait == 0) {
      maxTimeToWait = WAIT_TIME;
    }

    for (int i = 0; i < howMany; i++) {
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_activating_service__FINEST",
                new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
                    String.valueOf(i), String.valueOf(howMany) });
      }
      int restartCount = 0;
      // flag to indicate that we should sleep between retries when connecting to a fenced service.
      // First time through the do-while loop below dont sleep though.
      boolean sleepBetweenRetries = false;
      // Attempt to connect to remote service and activate the proxy.

      do {
        try {
          if (exclusiveAccess) {
            serviceInfo = getNextAvailable(serviceList);
            if (serviceInfo == null) {
              if (!redeploy) {
                // No more services available. Report how many services we attached to so far
                return succesfullConnections;
              }
              sleepBetweenRetries = true;
            }
          }
          if (sleepBetweenRetries) {
            try {
              Thread.sleep(maxTimeToWait);
            } catch (InterruptedException iex) {
            }

            if (serviceInfo == null && exclusiveAccess) {
              // Get a new service list from the VNS
              restartCount++;

              serviceList = getNewServiceList(aServiceUri, casProcessorConfig);

              // Copy the state of each service from current list to new list. Just change the state
              // of service from unassigned to assigned if it happens to be in use.
              int newServiceCount = VNSQuery.findUnassigned(currentServiceList, serviceList);
              if (newServiceCount > 0) {
                currentServiceList.clear();
                currentServiceList = serviceList;
              }
              continue;
            }
          }
          if (exclusiveAccess) {
            activateProcessor(casProcessorConfig, serviceInfo.getHost(), serviceInfo.getPort(),
                    aProcessingContainer, redeploy);
            serviceInfo.setAvailable(false);
          } else {
            // Create and connect a new proxy to the service
            activateProcessor(casProcessorConfig, aServiceUri, aProcessingContainer, redeploy);
          }
          succesfullConnections++;
          // on successfull connection, just brake out of the retry loop
          break;
        } catch (ResourceInitializationException ex) {
          if (ex.getCause() instanceof ServiceException
                  || ex.getCause() instanceof ServiceDownException) {
            if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING,
                      this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_activating_service__WARNING", new Object[] {
                          Thread.currentThread().getName(), aProcessingContainer.getName() });
            }
            // Increment number of CasProcessor restarts (redeploys)
            restartCount++;
          } else {
            restartCount = rC + 1; // Force termination of the loop
          }
        }
      } while (restartCount <= rC);

      // If exceeding defined max number of retries, take associated action with this condition
      // CasProcessor may be configured to Terminate the CPM, Disable itself or Disregard this
      // condition and continue.
      if (restartCount > rC) {
        handleMaxRestartThresholdReached(aProcessingContainer);
      }
    }
    return succesfullConnections;
  }