public void start()

in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java [402:591]


    public void start(AuthenticationService authenticationService,
                      AuthorizationService authorizationService,
                      ErrorNotifier errorNotifier) throws Exception {

        workerStatsManager.startupTimeStart();
        log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
        log.info("Worker Configs: {}", workerConfig);

        try {
            if (dlogUri != null) {
                DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
                try {
                    this.dlogNamespace = NamespaceBuilder.newBuilder()
                            .conf(dlogConf)
                            .clientId("function-worker-" + workerConfig.getWorkerId())
                            .uri(dlogUri)
                            .build();
                } catch (Exception e) {
                    log.error("Failed to initialize dlog namespace {} for storing function packages", dlogUri, e);
                    throw new RuntimeException(e);
                }
            }

            // create the state storage provider for accessing function state
            if (workerConfig.getStateStorageServiceUrl() != null) {
                this.stateStoreProvider =
                        (StateStoreProvider) Class.forName(workerConfig.getStateStorageProviderImplementation())
                                .getConstructor().newInstance();
                Map<String, Object> stateStoreProviderConfig = new HashMap<>();
                stateStoreProviderConfig.put(StateStoreProvider.STATE_STORAGE_SERVICE_URL,
                        workerConfig.getStateStorageServiceUrl());
                this.stateStoreProvider.init(stateStoreProviderConfig);
            }

            final String functionWebServiceUrl = StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
                    ? workerConfig.getFunctionWebServiceUrl()
                    : (workerConfig.getTlsEnabled()
                        ? workerConfig.getWorkerWebAddressTls() : workerConfig.getWorkerWebAddress());

            this.brokerAdmin = clientCreator.newPulsarAdmin(workerConfig.getPulsarWebServiceUrl(), workerConfig);
            this.functionAdmin = clientCreator.newPulsarAdmin(functionWebServiceUrl, workerConfig);
            this.client = clientCreator.newPulsarClient(workerConfig.getPulsarServiceUrl(), workerConfig);

            tryCreateNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
            tryCreateNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
            tryCreateNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
            //create scheduler manager
            this.schedulerManager = new SchedulerManager(workerConfig, client, getBrokerAdmin(), workerStatsManager,
                    errorNotifier);

            //create function meta data manager
            this.functionMetaDataManager = new FunctionMetaDataManager(
                    this.workerConfig, this.schedulerManager, this.client, errorNotifier);

            this.connectorsManager = new ConnectorsManager(workerConfig);
            this.functionsManager = new FunctionsManager(workerConfig);

            //create membership manager
            String coordinationTopic = workerConfig.getClusterCoordinationTopic();
            if (!getBrokerAdmin().topics().getSubscriptions(coordinationTopic)
                    .contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
                getBrokerAdmin().topics()
                        .createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION,
                                MessageId.earliest);
            }
            this.membershipManager = new MembershipManager(this, client, getBrokerAdmin());

            // create function runtime manager
            this.functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    this,
                    dlogNamespace,
                    membershipManager,
                    connectorsManager,
                    functionsManager,
                    functionMetaDataManager,
                    workerStatsManager,
                    errorNotifier);


            // initialize function assignment tailer that reads from the assignment topic
            this.functionAssignmentTailer = new FunctionAssignmentTailer(
                    functionRuntimeManager,
                    client.newReader(),
                    workerConfig,
                    errorNotifier);

            // Start worker early in the worker service init process so that functions don't get re-assigned because
            // initialize operations of FunctionRuntimeManager and FunctionMetadataManger might take a while
            this.leaderService = new LeaderService(this,
              client,
              functionAssignmentTailer,
              schedulerManager,
              functionRuntimeManager,
              functionMetaDataManager,
              membershipManager,
              errorNotifier);

            log.info("/** Start Leader Service **/");
            leaderService.start();

            // initialize function metadata manager
            log.info("/** Initializing Metadata Manager **/");
            functionMetaDataManager.initialize();

            // initialize function runtime manager
            log.info("/** Initializing Runtime Manager **/");

            MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
            Supplier<Boolean> checkIsStillLeader = WorkerUtils.getIsStillLeaderSupplier(membershipManager,
                    workerConfig.getWorkerId());

            // Setting references to managers in scheduler
            schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
            schedulerManager.setFunctionRuntimeManager(functionRuntimeManager);
            schedulerManager.setMembershipManager(membershipManager);
            schedulerManager.setLeaderService(leaderService);

            this.authenticationService = authenticationService;

            this.authorizationService = authorizationService;

            // Start function assignment tailer
            log.info("/** Starting Function Assignment Tailer **/");
            functionAssignmentTailer.startFromMessage(lastAssignmentMessageId);

            // start function metadata manager
            log.info("/** Starting Metadata Manager **/");
            functionMetaDataManager.start();

            // Starting cluster services
            this.clusterServiceCoordinator = new ClusterServiceCoordinator(
                    workerConfig.getWorkerId(),
                    leaderService,
                    checkIsStillLeader);

            clusterServiceCoordinator.addTask("membership-monitor",
                    workerConfig.getFailureCheckFreqMs(),
                    () -> {
                        // computing a new schedule and checking for failures cannot happen concurrently
                        // both paths of code modify internally cached assignments map in function runtime manager
                        schedulerManager.getSchedulerLock().lock();
                        try {
                            membershipManager.checkFailures(
                                    functionMetaDataManager, functionRuntimeManager, schedulerManager);
                        } finally {
                            schedulerManager.getSchedulerLock().unlock();
                        }
                    });

            if (workerConfig.getRebalanceCheckFreqSec() > 0) {
                clusterServiceCoordinator.addTask("rebalance-periodic-check",
                        workerConfig.getRebalanceCheckFreqSec() * 1000,
                        () -> {
                            try {
                                schedulerManager.rebalanceIfNotInprogress().get();
                            } catch (SchedulerManager.RebalanceInProgressException e) {
                                log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring.");
                            } catch (Exception e) {
                                log.warn("Encountered error when running scheduled rebalance", e);
                            }
                        });
            }

            if (workerConfig.getWorkerListProbeIntervalSec() > 0) {
                clusterServiceCoordinator.addTask("drain-worker-list-probe-periodic-check",
                        workerConfig.getWorkerListProbeIntervalSec() * 1000L,
                        () -> {
                                schedulerManager.updateWorkerDrainMap();
                        });
            }

            log.info("/** Starting Cluster Service Coordinator **/");
            clusterServiceCoordinator.start();

            // indicate function worker service is done initializing
            this.isInitialized = true;

            log.info("/** Started worker id={} **/", workerConfig.getWorkerId());

            workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
            workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager);
            workerStatsManager.setLeaderService(leaderService);
            workerStatsManager.setIsLeader(checkIsStillLeader);
            workerStatsManager.startupTimeEnd();
        } catch (Throwable t) {
            log.error("Error Starting up in worker", t);
            throw new RuntimeException(t);
        }
    }