in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java [1956:2127]
private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull TopicName topicName,
@Nullable TopicPolicies topicPolicies) {
requireNonNull(topicName);
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies();
final CompletableFuture<Optional<Policies>> nsPolicies = nsr.getPoliciesAsync(namespace);
final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace);
return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> {
PersistencePolicies persistencePolicies = null;
RetentionPolicies retentionPolicies = null;
OffloadPoliciesImpl topicLevelOffloadPolicies = null;
if (topicPolicies != null) {
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies = topicPolicies.getRetentionPolicies();
topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
if (persistencePolicies == null) {
persistencePolicies = policies.map(p -> p.persistence).orElseGet(
() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
serviceConfig.getManagedLedgerDefaultAckQuorum(),
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
}
if (retentionPolicies == null) {
if (SystemTopicNames.isSystemTopic(topicName)) {
if (log.isDebugEnabled()) {
log.debug("{} Disable data retention policy for system topic.", topicName);
}
retentionPolicies = new RetentionPolicies(0, 0);
} else {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}
}
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
managedLedgerConfig.setStorageClassName(persistencePolicies.getManagedLedgerStorageClassName());
if (serviceConfig.isStrictBookieAffinityEnabled()) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
} else if (isSystemTopic(topicName)) {
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
properties.put(IsolatedBookieEnsemblePlacementPolicy
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
} else {
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
} else {
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
}
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
managedLedgerConfig
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
managedLedgerConfig
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
managedLedgerConfig
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
managedLedgerConfig
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
managedLedgerConfig
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
managedLedgerConfig
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
managedLedgerConfig
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());
managedLedgerConfig.setMinimumBacklogCursorsForCaching(
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
managedLedgerConfig.setMinimumBacklogEntriesForCaching(
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
} else {
//If the topic level policy is null, use the namespace level
managedLedgerConfig
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
if (managedLedgerConfig.getLedgerOffloader() != null
&& managedLedgerConfig.getLedgerOffloader().isAppendable()
&& (NamespaceService.isSystemServiceNamespace(namespace.toString())
|| SystemTopicNames.isSystemTopic(topicName))) {
managedLedgerConfig.setLedgerOffloader(
new NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader()));
}
managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
managedLedgerConfig.setNewEntriesCheckDelayInMillis(
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
return managedLedgerConfig;
});
}