in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [453:522]
public void start() throws Exception {
// Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter
try {
Level lvl = Level.toLevel(logLevelRestClient, Level.ERROR);
org.apache.log4j.Logger.getLogger("org.elasticsearch.client.RestClient").setLevel(lvl);
} catch (Exception e) {
// Never fail because of the set of the logger
}
// on startup
new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
public Object execute(Object... args) throws Exception {
buildClient();
MainResponse response = client.info(RequestOptions.DEFAULT);
MainResponse.Version version = response.getVersion();
Version clusterVersion = Version.fromString(version.getNumber());
Version minimalVersion = Version.fromString(minimalElasticSearchVersion);
Version maximalVersion = Version.fromString(maximalElasticSearchVersion);
if (clusterVersion.before(minimalVersion) ||
clusterVersion.equals(maximalVersion) ||
clusterVersion.after(maximalVersion)) {
throw new Exception("ElasticSearch version is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !");
}
registerRolloverLifecyclePolicy();
loadPredefinedMappings(bundleContext, false);
loadPainlessScripts(bundleContext);
// load predefined mappings and condition dispatchers of any bundles that were started before this one.
for (Bundle existingBundle : bundleContext.getBundles()) {
if (existingBundle.getBundleContext() != null) {
loadPredefinedMappings(existingBundle.getBundleContext(), false);
loadPainlessScripts(existingBundle.getBundleContext());
}
}
if (client != null && bulkProcessor == null) {
bulkProcessor = getBulkProcessor();
}
// Wait for green
LOGGER.info("Waiting for GREEN cluster status...");
client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT);
LOGGER.info("Cluster status is GREEN");
// We keep in memory the latest available session index to be able to load session using direct GET access on ES
if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
LOGGER.info("Sessions are using rollover indices, loading latest session index available ...");
GetAliasesResponse sessionAliasResponse = client.indices().getAlias(new GetAliasesRequest(getIndex(Session.ITEM_TYPE)), RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> aliases = sessionAliasResponse.getAliases();
if (!aliases.isEmpty()) {
sessionLatestIndex = new TreeSet<>(aliases.keySet()).last();
LOGGER.info("Latest available session index found is: {}", sessionLatestIndex);
} else {
throw new IllegalStateException("No index found for sessions");
}
}
return true;
}
}.executeInClassLoader();
bundleContext.addBundleListener(this);
LOGGER.info("{} service started successfully.", this.getClass().getName());
}