public void handleMigrations()

in streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java [49:116]


  public void handleMigrations(SpServiceRegistration extensionsServiceConfig,
                               List<ModelMigratorConfig> migrationConfigs) {

    LOG.info("Received {} migrations from extension service {}.",
        migrationConfigs.size(),
        extensionsServiceConfig.getServiceUrl());
    LOG.info("Updating adapter descriptions by replacement...");
    updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl());
    LOG.info("Adapter descriptions are up to date.");

    LOG.info("Checking migrations for existing adapters in StreamPipes Core ...");
    for (var migrationConfig : migrationConfigs) {
      LOG.info("Searching for assets of '{}'", migrationConfig.targetAppId());
      LOG.debug("Searching for assets of '{}' with config {}", migrationConfig.targetAppId(), migrationConfig);
      var adapterDescriptions = adapterStorage.getAdaptersByAppId(migrationConfig.targetAppId());
      LOG.info("Found {} instances for appId '{}'", adapterDescriptions.size(), migrationConfig.targetAppId());
      for (var adapterDescription : adapterDescriptions) {

        var adapterVersion = adapterDescription.getVersion();

        if (adapterVersion == migrationConfig.fromVersion()) {
          LOG.info("Migration is required for adapter '{}'. Migrating from version '{}' to '{}' ...",
              adapterDescription.getElementId(),
              adapterVersion, migrationConfig.toVersion()
          );

          var migrationResult = performMigration(
              adapterDescription,
              migrationConfig,
              String.format("%s/%s/adapter",
                  extensionsServiceConfig.getServiceUrl(),
                  MIGRATION_ENDPOINT
              )
          );

          if (migrationResult.success()) {
            LOG.info("Migration successfully performed by extensions service. Updating adapter description ...");
            LOG.debug(
                "Migration was performed by extensions service '{}'",
                extensionsServiceConfig.getServiceUrl());

            adapterStorage.updateElement(migrationResult.element());
            LOG.info("Adapter description is updated - Migration successfully completed at Core.");
          } else {
            LOG.error("Migration failed with the following reason: {}", migrationResult.message());
            LOG.error(
                "Migration for adapter '{}' failed - Stopping adapter ...",
                migrationResult.element().getElementId()
            );
            try {
              WorkerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), adapterDescription);
            } catch (AdapterException e) {
              LOG.error("Stopping adapter failed: {}", StringUtils.join(e.getStackTrace(), "\n"));
            }
            LOG.info("Adapter successfully stopped.");
          }
        } else {
          LOG.info(
              "Migration is not applicable for adapter '{}' because of a version mismatch - "
                  + "adapter version: '{}',  migration starts at: '{}'",
              adapterDescription.getElementId(),
              adapterVersion,
              migrationConfig.fromVersion()
          );
        }
      }
    }
  }