public void start()

in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [453:522]


    public void start() throws Exception {

        // Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter
        try {
            Level lvl = Level.toLevel(logLevelRestClient, Level.ERROR);
            org.apache.log4j.Logger.getLogger("org.elasticsearch.client.RestClient").setLevel(lvl);
        } catch (Exception e) {
            // Never fail because of the set of the logger
        }

        // on startup
        new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
            public Object execute(Object... args) throws Exception {

                buildClient();

                MainResponse response = client.info(RequestOptions.DEFAULT);
                MainResponse.Version version = response.getVersion();
                Version clusterVersion = Version.fromString(version.getNumber());
                Version minimalVersion = Version.fromString(minimalElasticSearchVersion);
                Version maximalVersion = Version.fromString(maximalElasticSearchVersion);
                if (clusterVersion.before(minimalVersion) ||
                        clusterVersion.equals(maximalVersion) ||
                        clusterVersion.after(maximalVersion)) {
                    throw new Exception("ElasticSearch version is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !");
                }

                registerRolloverLifecyclePolicy();

                loadPredefinedMappings(bundleContext, false);
                loadPainlessScripts(bundleContext);

                // load predefined mappings and condition dispatchers of any bundles that were started before this one.
                for (Bundle existingBundle : bundleContext.getBundles()) {
                    if (existingBundle.getBundleContext() != null) {
                        loadPredefinedMappings(existingBundle.getBundleContext(), false);
                        loadPainlessScripts(existingBundle.getBundleContext());
                    }
                }

                if (client != null && bulkProcessor == null) {
                    bulkProcessor = getBulkProcessor();
                }

                // Wait for green
                LOGGER.info("Waiting for GREEN cluster status...");
                client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT);
                LOGGER.info("Cluster status is GREEN");

                // We keep in memory the latest available session index to be able to load session using direct GET access on ES
                if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
                    LOGGER.info("Sessions are using rollover indices, loading latest session index available ...");
                    GetAliasesResponse sessionAliasResponse = client.indices().getAlias(new GetAliasesRequest(getIndex(Session.ITEM_TYPE)), RequestOptions.DEFAULT);
                    Map<String, Set<AliasMetaData>> aliases = sessionAliasResponse.getAliases();
                    if (!aliases.isEmpty()) {
                        sessionLatestIndex = new TreeSet<>(aliases.keySet()).last();
                        LOGGER.info("Latest available session index found is: {}", sessionLatestIndex);
                    } else {
                        throw new IllegalStateException("No index found for sessions");
                    }
                }

                return true;
            }
        }.executeInClassLoader();

        bundleContext.addBundleListener(this);

        LOGGER.info("{} service started successfully.", this.getClass().getName());
    }