in streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java [63:139]
public void handleMigrations(SpServiceRegistration extensionsServiceConfig,
List<ModelMigratorConfig> migrationConfigs) {
if (!migrationConfigs.isEmpty()) {
LOG.info("Updating pipeline element descriptions by replacement...");
updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl());
LOG.info("Pipeline element descriptions are up to date.");
LOG.info("Received {} pipeline element migrations from extension service {}.",
migrationConfigs.size(),
extensionsServiceConfig.getServiceUrl());
var availablePipelines = pipelineStorage.findAll();
if (!availablePipelines.isEmpty()) {
LOG.info("Found {} available pipelines. Checking pipelines for applicable migrations...",
availablePipelines.size()
);
}
for (var pipeline : availablePipelines) {
if (shouldMigratePipeline(pipeline, migrationConfigs)) {
List<MigrationResult<?>> failedMigrations = new ArrayList<>();
var migratedDataProcessors = pipeline.getSepas()
.stream()
.map(processor -> {
if (getApplicableMigration(processor, migrationConfigs).isPresent()) {
return migratePipelineElement(
processor,
migrationConfigs,
String.format("%s/%s/processor",
extensionsServiceConfig.getServiceUrl(),
MIGRATION_ENDPOINT
),
failedMigrations
);
} else {
LOG.info("No migration applicable for data processor '{}'.", processor.getElementId());
return processor;
}
})
.toList();
pipeline.setSepas(migratedDataProcessors);
var migratedDataSinks = pipeline.getActions()
.stream()
.map(sink -> {
if (getApplicableMigration(sink, migrationConfigs).isPresent()) {
return migratePipelineElement(
sink,
migrationConfigs,
String.format("%s/%s/sink",
extensionsServiceConfig.getServiceUrl(),
MIGRATION_ENDPOINT
),
failedMigrations
);
} else {
LOG.info("No migration applicable for data sink '{}'.", sink.getElementId());
return sink;
}
})
.toList();
pipeline.setActions(migratedDataSinks);
pipelineStorage.updateElement(pipeline);
if (failedMigrations.isEmpty()) {
LOG.info("Migration for pipeline successfully completed.");
} else {
// pass most recent version of pipeline
handleFailedMigrations(pipelineStorage.getElementById(pipeline.getPipelineId()), failedMigrations);
}
}
}
} else {
LOG.info("No pipeline element migrations to perform");
}
}