public StatelessDataflow createDataflow()

in nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java [106:332]


    public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition dataflowDefinition,
                                            final ClassLoader extensionRootClassLoader)
                    throws IOException, StatelessConfigurationException {
        final long start = System.currentTimeMillis();

        ProvenanceRepository provenanceRepo = null;
        ContentRepository contentRepo = null;
        StatelessProcessScheduler processScheduler = null;
        FlowFileRepository flowFileRepo = null;
        FlowFileEventRepository flowFileEventRepo = null;

        try {
            final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
            final File workingDir = engineConfiguration.getWorkingDirectory();
            final File narExpansionDirectory = new File(workingDir, "nar");
            if (!narExpansionDirectory.exists() && !narExpansionDirectory.mkdirs()) {
                throw new IOException("Working Directory " + narExpansionDirectory + " does not exist and could not be created");
            }

            final NarClassLoaders narClassLoaders = new NarClassLoaders();
            final File extensionsWorkingDir = new File(narExpansionDirectory, "extensions");
            final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(
                    extensionsWorkingDir,
                    extensionRootClassLoader,
                    narClassLoaders,
                    engineConfiguration.isLogExtensionDiscovery()
            );

            final AssetManager assetManager = new StandardAssetManager();
            final File assetDir = new File(workingDir, "assets");
            final AssetManagerInitializationContext assetManagerInitializationContext = new AssetManagerInitializationContext() {
                @Override
                public AssetReferenceLookup getAssetReferenceLookup() {
                    return Set::of;
                }

                @Override
                public Map<String, String> getProperties() {
                    return Map.of(StandardAssetManager.ASSET_STORAGE_LOCATION_PROPERTY, assetDir.getAbsolutePath());
                }

                @Override
                public NodeTypeProvider getNodeTypeProvider() {
                    return new StatelessNodeTypeProvider();
                }
            };
            assetManager.initialize(assetManagerInitializationContext);

            flowFileEventRepo = new RingBufferEventRepository(5);

            final StatelessStateManagerProvider stateManagerProvider = new StatelessStateManagerProvider();

            final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
            final Duration processorStartTimeoutDuration = Duration.ofSeconds((long) FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(), TimeUnit.SECONDS));
            processScheduler = new StatelessProcessScheduler(extensionManager, processorStartTimeoutDuration);
            provenanceRepo = new StatelessProvenanceRepository(1_000);
            provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);

            final SSLContext sslContext;
            try {
                sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
            } catch (StatelessConfigurationException e) {
                throw new StatelessConfigurationException("Could not create SSLContext", e);
            }

            // Build Extension Repository
            final List<ExtensionClient> extensionClients = new ArrayList<>();
            for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
                final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
                extensionClients.add(extensionClient);
            }

            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration, narClassLoaders, extensionClients);
            extensionRepository.initialize();

            final PropertyEncryptor lazyInitializedEncryptor = new PropertyEncryptor() {
                private PropertyEncryptor created = null;

                @Override
                public String encrypt(final String property) {
                    return getEncryptor().encrypt(property);
                }

                @Override
                public String decrypt(final String encryptedProperty) {
                    return getEncryptor().decrypt(encryptedProperty);
                }

                private synchronized PropertyEncryptor getEncryptor() {
                    if (created != null) {
                        return created;
                    }

                    created = new PropertyEncryptorBuilder(engineConfiguration.getSensitivePropsKey())
                            .setAlgorithm(PropertyEncryptionMethod.NIFI_PBKDF2_AES_GCM_256.toString())
                            .build();
                    return created;
                }
            };

            final CounterRepository counterRepo = new StandardCounterRepository();

            final File krb5File = engineConfiguration.getKrb5File();
            final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
            if (krb5File != null) {
                logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
                System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
            }

            final StatelessEngine statelessEngine = new StandardStatelessEngine.Builder()
                    .bulletinRepository(bulletinRepository)
                    .encryptor(lazyInitializedEncryptor)
                    .extensionManager(extensionManager)
                    .assetManager(assetManager)
                    .stateManagerProvider(stateManagerProvider)
                    .processScheduler(processScheduler)
                    .kerberosConfiguration(kerberosConfig)
                    .flowFileEventRepository(flowFileEventRepo)
                    .provenanceRepository(provenanceRepo)
                    .extensionRepository(extensionRepository)
                    .counterRepository(counterRepo)
                    .statusTaskInterval(engineConfiguration.getStatusTaskInterval())
                    .componentEnableTimeout(engineConfiguration.getComponentEnableTimeout())
                    .build();

            final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);
            flowManager.createFlowRegistryClient(InMemoryFlowRegistry.class.getTypeName(), "in-memory-flow-registry", null, Collections.emptySet(), true, true, null);
            ((InMemoryFlowRegistry) flowManager.getFlowRegistryClient("in-memory-flow-registry").getComponent()).addFlowSnapshot(dataflowDefinition.getVersionedExternalFlow());

            final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);

            final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, stateManagerProvider);
            final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
            contentRepo = createContentRepository(engineConfiguration);
            flowFileRepo = new StatelessFlowFileRepository();

            final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
                counterRepo, provenanceRepo, stateManagerProvider);
            final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
                repositoryContextFactory);

            final String flowName = dataflowDefinition.getFlowName();
            final String threadNameSuffix = flowName == null ? "" : " for dataflow " + flowName;
            final StatelessProcessSchedulerInitializationContext schedulerInitializationContext = new StatelessProcessSchedulerInitializationContext.Builder()
                .componentLifeCycleThreadPool(new FlowEngine(8, "Component Lifecycle" + threadNameSuffix, true))
                .componentMonitoringThreadPool(new FlowEngine(2, "Monitor Processor Lifecycle" + threadNameSuffix, true))
                .frameworkTaskThreadPool(new FlowEngine(2, "Framework Task" + threadNameSuffix, true))
                .processContextFactory(processContextFactory)
                .manageThreadPools(true)
                .build();
            processScheduler.initialize(schedulerInitializationContext);
            statelessEngine.initialize(statelessEngineInitializationContext);

            // Initialize components. This is generally needed because of the interdependencies between the components.
            // There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
            final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
            final EventReporter eventReporter = (severity, category, message) -> {
                final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
                bulletinRepository.addBulletin(bulletin);
            };
            contentRepo.initialize(new ContentRepositoryContext() {
                @Override
                public ResourceClaimManager getResourceClaimManager() {
                    return resourceClaimManager;
                }

                @Override
                public EventReporter getEventReporter() {
                    return eventReporter;
                }
            });
            flowFileRepo.initialize(resourceClaimManager);

            final PythonBridge pythonBridge = new DisabledPythonBridge();
            flowManager.initialize(
                    controllerServiceProvider,
                    pythonBridge,
                    null,
                    null
            );

            // Create flow
            final ProcessGroup rootGroup = flowManager.createProcessGroup("root");
            rootGroup.setName("root");
            flowManager.setRootGroup(rootGroup);

            final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition);
            final long millis = System.currentTimeMillis() - start;
            logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);

            return dataflow;
        } catch (final Exception e) {
            try {
                if (provenanceRepo != null) {
                    provenanceRepo.close();
                }
            } catch (final IOException ioe) {
                e.addSuppressed(ioe);
            }

            if (contentRepo != null) {
                contentRepo.shutdown();
            }

            if (processScheduler != null) {
                processScheduler.shutdown();
            }

            if (flowFileRepo != null) {
                try {
                    flowFileRepo.close();
                } catch (final IOException ioe) {
                    e.addSuppressed(ioe);
                }
            }

            if (flowFileEventRepo != null) {
                try {
                    flowFileEventRepo.close();
                } catch (final IOException ioe) {
                    e.addSuppressed(ioe);
                }
            }

            throw e;
        }
    }