in nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java [106:332]
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition dataflowDefinition,
final ClassLoader extensionRootClassLoader)
throws IOException, StatelessConfigurationException {
final long start = System.currentTimeMillis();
ProvenanceRepository provenanceRepo = null;
ContentRepository contentRepo = null;
StatelessProcessScheduler processScheduler = null;
FlowFileRepository flowFileRepo = null;
FlowFileEventRepository flowFileEventRepo = null;
try {
final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
final File workingDir = engineConfiguration.getWorkingDirectory();
final File narExpansionDirectory = new File(workingDir, "nar");
if (!narExpansionDirectory.exists() && !narExpansionDirectory.mkdirs()) {
throw new IOException("Working Directory " + narExpansionDirectory + " does not exist and could not be created");
}
final NarClassLoaders narClassLoaders = new NarClassLoaders();
final File extensionsWorkingDir = new File(narExpansionDirectory, "extensions");
final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(
extensionsWorkingDir,
extensionRootClassLoader,
narClassLoaders,
engineConfiguration.isLogExtensionDiscovery()
);
final AssetManager assetManager = new StandardAssetManager();
final File assetDir = new File(workingDir, "assets");
final AssetManagerInitializationContext assetManagerInitializationContext = new AssetManagerInitializationContext() {
@Override
public AssetReferenceLookup getAssetReferenceLookup() {
return Set::of;
}
@Override
public Map<String, String> getProperties() {
return Map.of(StandardAssetManager.ASSET_STORAGE_LOCATION_PROPERTY, assetDir.getAbsolutePath());
}
@Override
public NodeTypeProvider getNodeTypeProvider() {
return new StatelessNodeTypeProvider();
}
};
assetManager.initialize(assetManagerInitializationContext);
flowFileEventRepo = new RingBufferEventRepository(5);
final StatelessStateManagerProvider stateManagerProvider = new StatelessStateManagerProvider();
final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
final Duration processorStartTimeoutDuration = Duration.ofSeconds((long) FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(), TimeUnit.SECONDS));
processScheduler = new StatelessProcessScheduler(extensionManager, processorStartTimeoutDuration);
provenanceRepo = new StatelessProvenanceRepository(1_000);
provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
final SSLContext sslContext;
try {
sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
} catch (StatelessConfigurationException e) {
throw new StatelessConfigurationException("Could not create SSLContext", e);
}
// Build Extension Repository
final List<ExtensionClient> extensionClients = new ArrayList<>();
for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
extensionClients.add(extensionClient);
}
final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration, narClassLoaders, extensionClients);
extensionRepository.initialize();
final PropertyEncryptor lazyInitializedEncryptor = new PropertyEncryptor() {
private PropertyEncryptor created = null;
@Override
public String encrypt(final String property) {
return getEncryptor().encrypt(property);
}
@Override
public String decrypt(final String encryptedProperty) {
return getEncryptor().decrypt(encryptedProperty);
}
private synchronized PropertyEncryptor getEncryptor() {
if (created != null) {
return created;
}
created = new PropertyEncryptorBuilder(engineConfiguration.getSensitivePropsKey())
.setAlgorithm(PropertyEncryptionMethod.NIFI_PBKDF2_AES_GCM_256.toString())
.build();
return created;
}
};
final CounterRepository counterRepo = new StandardCounterRepository();
final File krb5File = engineConfiguration.getKrb5File();
final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
if (krb5File != null) {
logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
}
final StatelessEngine statelessEngine = new StandardStatelessEngine.Builder()
.bulletinRepository(bulletinRepository)
.encryptor(lazyInitializedEncryptor)
.extensionManager(extensionManager)
.assetManager(assetManager)
.stateManagerProvider(stateManagerProvider)
.processScheduler(processScheduler)
.kerberosConfiguration(kerberosConfig)
.flowFileEventRepository(flowFileEventRepo)
.provenanceRepository(provenanceRepo)
.extensionRepository(extensionRepository)
.counterRepository(counterRepo)
.statusTaskInterval(engineConfiguration.getStatusTaskInterval())
.componentEnableTimeout(engineConfiguration.getComponentEnableTimeout())
.build();
final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);
flowManager.createFlowRegistryClient(InMemoryFlowRegistry.class.getTypeName(), "in-memory-flow-registry", null, Collections.emptySet(), true, true, null);
((InMemoryFlowRegistry) flowManager.getFlowRegistryClient("in-memory-flow-registry").getComponent()).addFlowSnapshot(dataflowDefinition.getVersionedExternalFlow());
final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, stateManagerProvider);
final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
contentRepo = createContentRepository(engineConfiguration);
flowFileRepo = new StatelessFlowFileRepository();
final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
counterRepo, provenanceRepo, stateManagerProvider);
final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
repositoryContextFactory);
final String flowName = dataflowDefinition.getFlowName();
final String threadNameSuffix = flowName == null ? "" : " for dataflow " + flowName;
final StatelessProcessSchedulerInitializationContext schedulerInitializationContext = new StatelessProcessSchedulerInitializationContext.Builder()
.componentLifeCycleThreadPool(new FlowEngine(8, "Component Lifecycle" + threadNameSuffix, true))
.componentMonitoringThreadPool(new FlowEngine(2, "Monitor Processor Lifecycle" + threadNameSuffix, true))
.frameworkTaskThreadPool(new FlowEngine(2, "Framework Task" + threadNameSuffix, true))
.processContextFactory(processContextFactory)
.manageThreadPools(true)
.build();
processScheduler.initialize(schedulerInitializationContext);
statelessEngine.initialize(statelessEngineInitializationContext);
// Initialize components. This is generally needed because of the interdependencies between the components.
// There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
final EventReporter eventReporter = (severity, category, message) -> {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
bulletinRepository.addBulletin(bulletin);
};
contentRepo.initialize(new ContentRepositoryContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public EventReporter getEventReporter() {
return eventReporter;
}
});
flowFileRepo.initialize(resourceClaimManager);
final PythonBridge pythonBridge = new DisabledPythonBridge();
flowManager.initialize(
controllerServiceProvider,
pythonBridge,
null,
null
);
// Create flow
final ProcessGroup rootGroup = flowManager.createProcessGroup("root");
rootGroup.setName("root");
flowManager.setRootGroup(rootGroup);
final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition);
final long millis = System.currentTimeMillis() - start;
logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);
return dataflow;
} catch (final Exception e) {
try {
if (provenanceRepo != null) {
provenanceRepo.close();
}
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
if (contentRepo != null) {
contentRepo.shutdown();
}
if (processScheduler != null) {
processScheduler.shutdown();
}
if (flowFileRepo != null) {
try {
flowFileRepo.close();
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
}
if (flowFileEventRepo != null) {
try {
flowFileEventRepo.close();
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
}
throw e;
}
}