public void execute()

in tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo150.java [40:107]


    public void execute(MigrationContext migrationContext, BundleContext bundleContext) throws IOException {
        CloseableHttpClient httpClient = migrationContext.getHttpClient();
        String esAddress = migrationContext.getConfigString(MigrationConfig.CONFIG_ES_ADDRESS);
        String es5Address = migrationContext.askUserWithDefaultAnswer("SOURCE Elasticsearch 5.6 cluster address (default: http://localhost:9210) : ", "http://localhost:9210");
        String sourceIndexPrefix = migrationContext.getConfigString(MigrationConfig.INDEX_PREFIX);
        String destIndexPrefix = migrationContext.askUserWithDefaultAnswer("TARGET index prefix (default: context) : ", "context");
        int numberOfShards = Integer.parseInt(migrationContext.askUserWithDefaultAnswer("Number of shards for TARGET (default: 5) : ", "5"));
        int numberOfReplicas = Integer.parseInt(migrationContext.askUserWithDefaultAnswer("Number of replicas for TARGET (default: 1) : ", "1"));
        Set<String> monthlyIndexTypes = new HashSet<>();
        monthlyIndexTypes.add("event");
        monthlyIndexTypes.add("session");
        long startTime = System.currentTimeMillis();
        // let's try to connect to see if its correct
        JSONObject indicesStats = new JSONObject(HttpUtils.executeGetRequest(httpClient, es5Address + "/_stats", null));
        JSONObject indices = indicesStats.getJSONObject("indices");
        Set<String> indexNames = new TreeSet<>(indices.keySet());
        Set<String> monthlyIndexNames = new TreeSet<>();
        for (String indexName : indexNames) {
            if (indexName.startsWith(sourceIndexPrefix + "-")) {
                monthlyIndexNames.add(indexName);
            }
        }

        // now let's load all installed mappings but we will still be missing some from optional extensions such as the Salesforce connector
        for (Bundle bundle : bundleContext.getBundles()) {
            Enumeration<URL> predefinedMappings = bundle.findEntries("META-INF/cxs/mappings", "*.json", true);
            if (predefinedMappings == null) {
                continue;
            }
            while (predefinedMappings.hasMoreElements()) {
                URL predefinedMappingURL = predefinedMappings.nextElement();
                final String path = predefinedMappingURL.getPath();
                String itemType = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'));
                String mappingDefinition = loadMappingFile(predefinedMappingURL);
                JSONObject newTypeMapping = new JSONObject(mappingDefinition);
                if (!monthlyIndexTypes.contains(itemType)) {
                    String indexName = "geonameEntry".equals(itemType) ? "geonames" : sourceIndexPrefix;

                    JSONObject es5TypeMapping = getES5TypeMapping(httpClient, es5Address, indexName, itemType);
                    int es5MappingsTotalFieldsLimit = getES5MappingsTotalFieldsLimit(httpClient, es5Address, indexName);
                    String destIndexName = itemType.toLowerCase();
                    if (!indexExists(httpClient, esAddress, destIndexPrefix, destIndexName)) {
                        createESIndex(httpClient, esAddress, destIndexPrefix, destIndexName, numberOfShards, numberOfReplicas, es5MappingsTotalFieldsLimit, getMergedTypeMapping(es5TypeMapping, newTypeMapping));
                        reIndex(migrationContext, httpClient, esAddress, es5Address, indexName, getIndexName(destIndexPrefix, destIndexName), itemType);
                    } else {
                        migrationContext.printMessage("Index " + getIndexName(destIndexPrefix, itemType.toLowerCase()) + " already exists, skipping re-indexation...");
                    }
                } else {
                    for (String indexName : monthlyIndexNames) {
                        // we need to extract the date part
                        String datePart = indexName.substring(sourceIndexPrefix.length() + 1);
                        String destIndexName = itemType.toLowerCase() + "-" + INDEX_DATE_PREFIX + datePart;
                        JSONObject es5TypeMapping = getES5TypeMapping(httpClient, es5Address, indexName, itemType);
                        int es5MappingsTotalFieldsLimit = getES5MappingsTotalFieldsLimit(httpClient, es5Address, indexName);
                        if (!indexExists(httpClient, esAddress, destIndexPrefix, destIndexName)) {
                            createESIndex(httpClient, esAddress, destIndexPrefix, destIndexName, numberOfShards, numberOfReplicas, es5MappingsTotalFieldsLimit, getMergedTypeMapping(es5TypeMapping, newTypeMapping));
                            reIndex(migrationContext, httpClient, esAddress, es5Address, indexName, getIndexName(destIndexPrefix, destIndexName), itemType);
                        } else {
                            migrationContext.printMessage("Index " + getIndexName(destIndexPrefix, destIndexName) + " already exists, skipping re-indexation...");
                        }
                    }
                }
            }
        }

        long totalMigrationTime = System.currentTimeMillis() - startTime;
        migrationContext.printMessage("Migration operations completed in " + totalMigrationTime + "ms");
    }