in tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo150.java [40:107]
public void execute(MigrationContext migrationContext, BundleContext bundleContext) throws IOException {
CloseableHttpClient httpClient = migrationContext.getHttpClient();
String esAddress = migrationContext.getConfigString(MigrationConfig.CONFIG_ES_ADDRESS);
String es5Address = migrationContext.askUserWithDefaultAnswer("SOURCE Elasticsearch 5.6 cluster address (default: http://localhost:9210) : ", "http://localhost:9210");
String sourceIndexPrefix = migrationContext.getConfigString(MigrationConfig.INDEX_PREFIX);
String destIndexPrefix = migrationContext.askUserWithDefaultAnswer("TARGET index prefix (default: context) : ", "context");
int numberOfShards = Integer.parseInt(migrationContext.askUserWithDefaultAnswer("Number of shards for TARGET (default: 5) : ", "5"));
int numberOfReplicas = Integer.parseInt(migrationContext.askUserWithDefaultAnswer("Number of replicas for TARGET (default: 1) : ", "1"));
Set<String> monthlyIndexTypes = new HashSet<>();
monthlyIndexTypes.add("event");
monthlyIndexTypes.add("session");
long startTime = System.currentTimeMillis();
// let's try to connect to see if its correct
JSONObject indicesStats = new JSONObject(HttpUtils.executeGetRequest(httpClient, es5Address + "/_stats", null));
JSONObject indices = indicesStats.getJSONObject("indices");
Set<String> indexNames = new TreeSet<>(indices.keySet());
Set<String> monthlyIndexNames = new TreeSet<>();
for (String indexName : indexNames) {
if (indexName.startsWith(sourceIndexPrefix + "-")) {
monthlyIndexNames.add(indexName);
}
}
// now let's load all installed mappings but we will still be missing some from optional extensions such as the Salesforce connector
for (Bundle bundle : bundleContext.getBundles()) {
Enumeration<URL> predefinedMappings = bundle.findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
continue;
}
while (predefinedMappings.hasMoreElements()) {
URL predefinedMappingURL = predefinedMappings.nextElement();
final String path = predefinedMappingURL.getPath();
String itemType = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'));
String mappingDefinition = loadMappingFile(predefinedMappingURL);
JSONObject newTypeMapping = new JSONObject(mappingDefinition);
if (!monthlyIndexTypes.contains(itemType)) {
String indexName = "geonameEntry".equals(itemType) ? "geonames" : sourceIndexPrefix;
JSONObject es5TypeMapping = getES5TypeMapping(httpClient, es5Address, indexName, itemType);
int es5MappingsTotalFieldsLimit = getES5MappingsTotalFieldsLimit(httpClient, es5Address, indexName);
String destIndexName = itemType.toLowerCase();
if (!indexExists(httpClient, esAddress, destIndexPrefix, destIndexName)) {
createESIndex(httpClient, esAddress, destIndexPrefix, destIndexName, numberOfShards, numberOfReplicas, es5MappingsTotalFieldsLimit, getMergedTypeMapping(es5TypeMapping, newTypeMapping));
reIndex(migrationContext, httpClient, esAddress, es5Address, indexName, getIndexName(destIndexPrefix, destIndexName), itemType);
} else {
migrationContext.printMessage("Index " + getIndexName(destIndexPrefix, itemType.toLowerCase()) + " already exists, skipping re-indexation...");
}
} else {
for (String indexName : monthlyIndexNames) {
// we need to extract the date part
String datePart = indexName.substring(sourceIndexPrefix.length() + 1);
String destIndexName = itemType.toLowerCase() + "-" + INDEX_DATE_PREFIX + datePart;
JSONObject es5TypeMapping = getES5TypeMapping(httpClient, es5Address, indexName, itemType);
int es5MappingsTotalFieldsLimit = getES5MappingsTotalFieldsLimit(httpClient, es5Address, indexName);
if (!indexExists(httpClient, esAddress, destIndexPrefix, destIndexName)) {
createESIndex(httpClient, esAddress, destIndexPrefix, destIndexName, numberOfShards, numberOfReplicas, es5MappingsTotalFieldsLimit, getMergedTypeMapping(es5TypeMapping, newTypeMapping));
reIndex(migrationContext, httpClient, esAddress, es5Address, indexName, getIndexName(destIndexPrefix, destIndexName), itemType);
} else {
migrationContext.printMessage("Index " + getIndexName(destIndexPrefix, destIndexName) + " already exists, skipping re-indexation...");
}
}
}
}
}
long totalMigrationTime = System.currentTimeMillis() - startTime;
migrationContext.printMessage("Migration operations completed in " + totalMigrationTime + "ms");
}