in streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java [49:116]
public void handleMigrations(SpServiceRegistration extensionsServiceConfig,
List<ModelMigratorConfig> migrationConfigs) {
LOG.info("Received {} migrations from extension service {}.",
migrationConfigs.size(),
extensionsServiceConfig.getServiceUrl());
LOG.info("Updating adapter descriptions by replacement...");
updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl());
LOG.info("Adapter descriptions are up to date.");
LOG.info("Checking migrations for existing adapters in StreamPipes Core ...");
for (var migrationConfig : migrationConfigs) {
LOG.info("Searching for assets of '{}'", migrationConfig.targetAppId());
LOG.debug("Searching for assets of '{}' with config {}", migrationConfig.targetAppId(), migrationConfig);
var adapterDescriptions = adapterStorage.getAdaptersByAppId(migrationConfig.targetAppId());
LOG.info("Found {} instances for appId '{}'", adapterDescriptions.size(), migrationConfig.targetAppId());
for (var adapterDescription : adapterDescriptions) {
var adapterVersion = adapterDescription.getVersion();
if (adapterVersion == migrationConfig.fromVersion()) {
LOG.info("Migration is required for adapter '{}'. Migrating from version '{}' to '{}' ...",
adapterDescription.getElementId(),
adapterVersion, migrationConfig.toVersion()
);
var migrationResult = performMigration(
adapterDescription,
migrationConfig,
String.format("%s/%s/adapter",
extensionsServiceConfig.getServiceUrl(),
MIGRATION_ENDPOINT
)
);
if (migrationResult.success()) {
LOG.info("Migration successfully performed by extensions service. Updating adapter description ...");
LOG.debug(
"Migration was performed by extensions service '{}'",
extensionsServiceConfig.getServiceUrl());
adapterStorage.updateElement(migrationResult.element());
LOG.info("Adapter description is updated - Migration successfully completed at Core.");
} else {
LOG.error("Migration failed with the following reason: {}", migrationResult.message());
LOG.error(
"Migration for adapter '{}' failed - Stopping adapter ...",
migrationResult.element().getElementId()
);
try {
WorkerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), adapterDescription);
} catch (AdapterException e) {
LOG.error("Stopping adapter failed: {}", StringUtils.join(e.getStackTrace(), "\n"));
}
LOG.info("Adapter successfully stopped.");
}
} else {
LOG.info(
"Migration is not applicable for adapter '{}' because of a version mismatch - "
+ "adapter version: '{}', migration starts at: '{}'",
adapterDescription.getElementId(),
adapterVersion,
migrationConfig.fromVersion()
);
}
}
}
}