in pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java [402:591]
public void start(AuthenticationService authenticationService,
AuthorizationService authorizationService,
ErrorNotifier errorNotifier) throws Exception {
workerStatsManager.startupTimeStart();
log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
log.info("Worker Configs: {}", workerConfig);
try {
if (dlogUri != null) {
DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
try {
this.dlogNamespace = NamespaceBuilder.newBuilder()
.conf(dlogConf)
.clientId("function-worker-" + workerConfig.getWorkerId())
.uri(dlogUri)
.build();
} catch (Exception e) {
log.error("Failed to initialize dlog namespace {} for storing function packages", dlogUri, e);
throw new RuntimeException(e);
}
}
// create the state storage provider for accessing function state
if (workerConfig.getStateStorageServiceUrl() != null) {
this.stateStoreProvider =
(StateStoreProvider) Class.forName(workerConfig.getStateStorageProviderImplementation())
.getConstructor().newInstance();
Map<String, Object> stateStoreProviderConfig = new HashMap<>();
stateStoreProviderConfig.put(StateStoreProvider.STATE_STORAGE_SERVICE_URL,
workerConfig.getStateStorageServiceUrl());
this.stateStoreProvider.init(stateStoreProviderConfig);
}
final String functionWebServiceUrl = StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
? workerConfig.getFunctionWebServiceUrl()
: (workerConfig.getTlsEnabled()
? workerConfig.getWorkerWebAddressTls() : workerConfig.getWorkerWebAddress());
this.brokerAdmin = clientCreator.newPulsarAdmin(workerConfig.getPulsarWebServiceUrl(), workerConfig);
this.functionAdmin = clientCreator.newPulsarAdmin(functionWebServiceUrl, workerConfig);
this.client = clientCreator.newPulsarClient(workerConfig.getPulsarServiceUrl(), workerConfig);
tryCreateNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
tryCreateNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
tryCreateNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
//create scheduler manager
this.schedulerManager = new SchedulerManager(workerConfig, client, getBrokerAdmin(), workerStatsManager,
errorNotifier);
//create function meta data manager
this.functionMetaDataManager = new FunctionMetaDataManager(
this.workerConfig, this.schedulerManager, this.client, errorNotifier);
this.connectorsManager = new ConnectorsManager(workerConfig);
this.functionsManager = new FunctionsManager(workerConfig);
//create membership manager
String coordinationTopic = workerConfig.getClusterCoordinationTopic();
if (!getBrokerAdmin().topics().getSubscriptions(coordinationTopic)
.contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
getBrokerAdmin().topics()
.createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION,
MessageId.earliest);
}
this.membershipManager = new MembershipManager(this, client, getBrokerAdmin());
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
this,
dlogNamespace,
membershipManager,
connectorsManager,
functionsManager,
functionMetaDataManager,
workerStatsManager,
errorNotifier);
// initialize function assignment tailer that reads from the assignment topic
this.functionAssignmentTailer = new FunctionAssignmentTailer(
functionRuntimeManager,
client.newReader(),
workerConfig,
errorNotifier);
// Start worker early in the worker service init process so that functions don't get re-assigned because
// initialize operations of FunctionRuntimeManager and FunctionMetadataManger might take a while
this.leaderService = new LeaderService(this,
client,
functionAssignmentTailer,
schedulerManager,
functionRuntimeManager,
functionMetaDataManager,
membershipManager,
errorNotifier);
log.info("/** Start Leader Service **/");
leaderService.start();
// initialize function metadata manager
log.info("/** Initializing Metadata Manager **/");
functionMetaDataManager.initialize();
// initialize function runtime manager
log.info("/** Initializing Runtime Manager **/");
MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
Supplier<Boolean> checkIsStillLeader = WorkerUtils.getIsStillLeaderSupplier(membershipManager,
workerConfig.getWorkerId());
// Setting references to managers in scheduler
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
schedulerManager.setFunctionRuntimeManager(functionRuntimeManager);
schedulerManager.setMembershipManager(membershipManager);
schedulerManager.setLeaderService(leaderService);
this.authenticationService = authenticationService;
this.authorizationService = authorizationService;
// Start function assignment tailer
log.info("/** Starting Function Assignment Tailer **/");
functionAssignmentTailer.startFromMessage(lastAssignmentMessageId);
// start function metadata manager
log.info("/** Starting Metadata Manager **/");
functionMetaDataManager.start();
// Starting cluster services
this.clusterServiceCoordinator = new ClusterServiceCoordinator(
workerConfig.getWorkerId(),
leaderService,
checkIsStillLeader);
clusterServiceCoordinator.addTask("membership-monitor",
workerConfig.getFailureCheckFreqMs(),
() -> {
// computing a new schedule and checking for failures cannot happen concurrently
// both paths of code modify internally cached assignments map in function runtime manager
schedulerManager.getSchedulerLock().lock();
try {
membershipManager.checkFailures(
functionMetaDataManager, functionRuntimeManager, schedulerManager);
} finally {
schedulerManager.getSchedulerLock().unlock();
}
});
if (workerConfig.getRebalanceCheckFreqSec() > 0) {
clusterServiceCoordinator.addTask("rebalance-periodic-check",
workerConfig.getRebalanceCheckFreqSec() * 1000,
() -> {
try {
schedulerManager.rebalanceIfNotInprogress().get();
} catch (SchedulerManager.RebalanceInProgressException e) {
log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring.");
} catch (Exception e) {
log.warn("Encountered error when running scheduled rebalance", e);
}
});
}
if (workerConfig.getWorkerListProbeIntervalSec() > 0) {
clusterServiceCoordinator.addTask("drain-worker-list-probe-periodic-check",
workerConfig.getWorkerListProbeIntervalSec() * 1000L,
() -> {
schedulerManager.updateWorkerDrainMap();
});
}
log.info("/** Starting Cluster Service Coordinator **/");
clusterServiceCoordinator.start();
// indicate function worker service is done initializing
this.isInitialized = true;
log.info("/** Started worker id={} **/", workerConfig.getWorkerId());
workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager);
workerStatsManager.setLeaderService(leaderService);
workerStatsManager.setIsLeader(checkIsStillLeader);
workerStatsManager.startupTimeEnd();
} catch (Throwable t) {
log.error("Error Starting up in worker", t);
throw new RuntimeException(t);
}
}