public void handleMigrations()

in streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java [63:139]


  public void handleMigrations(SpServiceRegistration extensionsServiceConfig,
                               List<ModelMigratorConfig> migrationConfigs) {
    if (!migrationConfigs.isEmpty()) {
      LOG.info("Updating pipeline element descriptions by replacement...");
      updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl());
      LOG.info("Pipeline element descriptions are up to date.");

      LOG.info("Received {} pipeline element migrations from extension service {}.",
          migrationConfigs.size(),
          extensionsServiceConfig.getServiceUrl());
      var availablePipelines = pipelineStorage.findAll();
      if (!availablePipelines.isEmpty()) {
        LOG.info("Found {} available pipelines. Checking pipelines for applicable migrations...",
            availablePipelines.size()
        );
      }

      for (var pipeline : availablePipelines) {
        if (shouldMigratePipeline(pipeline, migrationConfigs)) {
          List<MigrationResult<?>> failedMigrations = new ArrayList<>();

          var migratedDataProcessors = pipeline.getSepas()
              .stream()
              .map(processor -> {
                if (getApplicableMigration(processor, migrationConfigs).isPresent()) {
                  return migratePipelineElement(
                      processor,
                      migrationConfigs,
                      String.format("%s/%s/processor",
                          extensionsServiceConfig.getServiceUrl(),
                          MIGRATION_ENDPOINT
                      ),
                      failedMigrations
                  );
                } else {
                  LOG.info("No migration applicable for data processor '{}'.", processor.getElementId());
                  return processor;
                }
              })
              .toList();
          pipeline.setSepas(migratedDataProcessors);

          var migratedDataSinks = pipeline.getActions()
              .stream()
              .map(sink -> {
                if (getApplicableMigration(sink, migrationConfigs).isPresent()) {
                  return migratePipelineElement(
                      sink,
                      migrationConfigs,
                      String.format("%s/%s/sink",
                          extensionsServiceConfig.getServiceUrl(),
                          MIGRATION_ENDPOINT
                      ),
                      failedMigrations
                  );
                } else {
                  LOG.info("No migration applicable for data sink '{}'.", sink.getElementId());
                  return sink;
                }
              })
              .toList();
          pipeline.setActions(migratedDataSinks);

          pipelineStorage.updateElement(pipeline);

          if (failedMigrations.isEmpty()) {
            LOG.info("Migration for pipeline successfully completed.");
          } else {
            // pass most recent version of pipeline
            handleFailedMigrations(pipelineStorage.getElementById(pipeline.getPipelineId()), failedMigrations);
          }
        }
      }
    } else {
      LOG.info("No pipeline element migrations to perform");
    }
  }