in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/container/deployer/vinci/VinciCasProcessorDeployer.java [735:918]
private int attachToServices(boolean redeploy, String aServiceUri, int howMany,
ProcessingContainer aProcessingContainer) throws Exception {
VinciServiceInfo serviceInfo = null;
// Retrieve configuration information for the CasProcessor. This is the configuration from the
// CPE descriptor
CasProcessorConfiguration casProcessorConfig = aProcessingContainer
.getCasProcessorConfiguration();
ArrayList serviceList = null;
// Check if this Cas Processor has exclusive service access. Meaning it requires service per
// proxy.
boolean exclusiveAccess = false;
String serviceAccess = casProcessorConfig.getDeploymentParameter("service-access");
if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_cp_with_exclusive_access__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
casProcessorConfig.getDeploymentType() });
}
exclusiveAccess = true;
// The following is true if the service just crashed and we are trying to recover. The
// container in this case
// should contain a reference to the Cas Processor that failed.
if (((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList.size() > 0) {
// In case the service starts on the same port as before it crashed, mark it as
// available in the current service list
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_retrieve_cp_from_failed_cplist__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
casProcessorConfig.getDeploymentType() });
}
CasProcessor processor = (CasProcessor) ((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList
.get(0);
// Extract old (stale) proxy from the CAS Processor
VinciTAP tap = ((NetworkCasProcessorImpl) processor).getProxy();
if (tap != null) {
// Since the new service may have started on the same machine and the same port, make sure
// that the
// current list is updated so that the service can be again connected to.
String service_host = tap.getServiceHost();
int service_port = tap.getServicePort();
// Go through all services in the in-use service list and try to find one that matches
// host and port
// If found, change the state of the service to available again.
for (int i = 0; currentServiceList != null && i < currentServiceList.size(); i++) {
VinciServiceInfo si = (VinciServiceInfo) currentServiceList.get(i);
if (si.getHost().equals(service_host) && si.getPort() == service_port) {
si.setAvailable(true);
}
}
}
}
// Retrieve a new service list from the VNS
serviceList = getNewServiceList(aServiceUri, casProcessorConfig);
if (serviceList == null || serviceList.size() == 0 && !redeploy) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_no_service_in_vns__FINEST",
new Object[] { Thread.currentThread().getName(), aServiceUri,
casProcessorConfig.getDeploymentType(),
casProcessorConfig.getDeploymentParameter(Constants.VNS_HOST),
casProcessorConfig.getDeploymentParameter(Constants.VNS_PORT) });
}
// When redeploying/reconnecting, we only interested in one proxy
if (redeploy) {
howMany = 1; // override
}
// If this is the first time through here, make sure the service list is cached. This is the
// current service list.
if (currentServiceList == null) {
currentServiceList = serviceList;
} else {
// Copy the state of each service from current list to new list. Just change the state
// of service from unassigned to assigned if it happens not to be in use.
int newServiceCount = VNSQuery.findUnassigned(currentServiceList, serviceList);
if (newServiceCount > 0) {
// new Services have been started. Use the new list as current.
currentServiceList.clear();
currentServiceList = serviceList;
}
}
}
int succesfullConnections = 0;
int rC = casProcessorConfig.getMaxRestartCount();
int maxTimeToWait = casProcessorConfig.getMaxTimeToWaitBetweenRetries();
// Never sleep indefinitely. Override if the maxTimeToWait = 0
if (maxTimeToWait == 0) {
maxTimeToWait = WAIT_TIME;
}
for (int i = 0; i < howMany; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
String.valueOf(i), String.valueOf(howMany) });
}
int restartCount = 0;
// flag to indicate that we should sleep between retries when connecting to a fenced service.
// First time through the do-while loop below dont sleep though.
boolean sleepBetweenRetries = false;
// Attempt to connect to remote service and activate the proxy.
do {
try {
if (exclusiveAccess) {
serviceInfo = getNextAvailable(serviceList);
if (serviceInfo == null) {
if (!redeploy) {
// No more services available. Report how many services we attached to so far
return succesfullConnections;
}
sleepBetweenRetries = true;
}
}
if (sleepBetweenRetries) {
try {
Thread.sleep(maxTimeToWait);
} catch (InterruptedException iex) {
}
if (serviceInfo == null && exclusiveAccess) {
// Get a new service list from the VNS
restartCount++;
serviceList = getNewServiceList(aServiceUri, casProcessorConfig);
// Copy the state of each service from current list to new list. Just change the state
// of service from unassigned to assigned if it happens to be in use.
int newServiceCount = VNSQuery.findUnassigned(currentServiceList, serviceList);
if (newServiceCount > 0) {
currentServiceList.clear();
currentServiceList = serviceList;
}
continue;
}
}
if (exclusiveAccess) {
activateProcessor(casProcessorConfig, serviceInfo.getHost(), serviceInfo.getPort(),
aProcessingContainer, redeploy);
serviceInfo.setAvailable(false);
} else {
// Create and connect a new proxy to the service
activateProcessor(casProcessorConfig, aServiceUri, aProcessingContainer, redeploy);
}
succesfullConnections++;
// on successfull connection, just brake out of the retry loop
break;
} catch (ResourceInitializationException ex) {
if (ex.getCause() instanceof ServiceException
|| ex.getCause() instanceof ServiceDownException) {
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service__WARNING", new Object[] {
Thread.currentThread().getName(), aProcessingContainer.getName() });
}
// Increment number of CasProcessor restarts (redeploys)
restartCount++;
} else {
restartCount = rC + 1; // Force termination of the loop
}
}
} while (restartCount <= rC);
// If exceeding defined max number of retries, take associated action with this condition
// CasProcessor may be configured to Terminate the CPM, Disable itself or Disregard this
// condition and continue.
if (restartCount > rC) {
handleMaxRestartThresholdReached(aProcessingContainer);
}
}
return succesfullConnections;
}