private CompletableFuture getManagedLedgerConfig()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java [1956:2127]


    private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull TopicName topicName,
                                                                          @Nullable TopicPolicies topicPolicies) {
        requireNonNull(topicName);
        NamespaceName namespace = topicName.getNamespaceObject();
        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
        LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies();
        final CompletableFuture<Optional<Policies>> nsPolicies = nsr.getPoliciesAsync(namespace);
        final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace);
        return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> {
            PersistencePolicies persistencePolicies = null;
            RetentionPolicies retentionPolicies = null;
            OffloadPoliciesImpl topicLevelOffloadPolicies = null;
            if (topicPolicies != null) {
                persistencePolicies = topicPolicies.getPersistence();
                retentionPolicies = topicPolicies.getRetentionPolicies();
                topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
            }

            if (persistencePolicies == null) {
                persistencePolicies = policies.map(p -> p.persistence).orElseGet(
                        () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
                                serviceConfig.getManagedLedgerDefaultWriteQuorum(),
                                serviceConfig.getManagedLedgerDefaultAckQuorum(),
                                serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
            }

            if (retentionPolicies == null) {
                if (SystemTopicNames.isSystemTopic(topicName)) {
                    if (log.isDebugEnabled()) {
                        log.debug("{} Disable data retention policy for system topic.", topicName);
                    }
                    retentionPolicies = new RetentionPolicies(0, 0);
                } else {
                    retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
                            () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
                                    serviceConfig.getDefaultRetentionSizeInMB())
                    );
                }
            }

            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
            managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
            managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
            managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
            managedLedgerConfig.setStorageClassName(persistencePolicies.getManagedLedgerStorageClassName());

            if (serviceConfig.isStrictBookieAffinityEnabled()) {
                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
                        IsolatedBookieEnsemblePlacementPolicy.class);
                if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
                    Map<String, Object> properties = new HashMap<>();
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
                            localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
                            localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                } else if (isSystemTopic(topicName)) {
                    Map<String, Object> properties = new HashMap<>();
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
                    properties.put(IsolatedBookieEnsemblePlacementPolicy
                            .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                } else {
                    Map<String, Object> properties = new HashMap<>();
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                }
            } else {
                if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
                            IsolatedBookieEnsemblePlacementPolicy.class);
                    Map<String, Object> properties = new HashMap<>();
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
                            localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
                    properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
                            localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                }
            }

            managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
            managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
            managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());

            managedLedgerConfig
                    .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
            managedLedgerConfig
                    .setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
            managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
                    serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
            managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
                    serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
            managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
            managedLedgerConfig
                    .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
                            TimeUnit.MINUTES);
            managedLedgerConfig
                    .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
                            TimeUnit.MINUTES);
            managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());

            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
                    serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
            managedLedgerConfig
                    .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
            managedLedgerConfig
                    .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
            managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
            managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
                    serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
            managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
            managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
            managedLedgerConfig
                    .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());

            managedLedgerConfig
                    .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
            managedLedgerConfig
                    .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
            managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
            managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
            managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
            managedLedgerConfig.setInactiveLedgerRollOverTime(
                    serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
            managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
                    serviceConfig.isCacheEvictionByMarkDeletedPosition());
            managedLedgerConfig.setMinimumBacklogCursorsForCaching(
                    serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
            managedLedgerConfig.setMinimumBacklogEntriesForCaching(
                    serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
            managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
                    serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());

            OffloadPoliciesImpl nsLevelOffloadPolicies =
                    (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
            OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
                    topicLevelOffloadPolicies,
                    OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
                    getPulsar().getConfig().getProperties());
            if (topicLevelOffloadPolicies != null) {
                try {
                    LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
                    managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
                } catch (PulsarServerException e) {
                    throw new RuntimeException(e);
                }
            } else {
                //If the topic level policy is null, use the namespace level
                managedLedgerConfig
                        .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
            }
            if (managedLedgerConfig.getLedgerOffloader() != null
                    && managedLedgerConfig.getLedgerOffloader().isAppendable()
                    && (NamespaceService.isSystemServiceNamespace(namespace.toString())
                            || SystemTopicNames.isSystemTopic(topicName))) {
                managedLedgerConfig.setLedgerOffloader(
                        new NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader()));
            }

            managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());

            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
                    serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
            managedLedgerConfig.setNewEntriesCheckDelayInMillis(
                    serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
            return managedLedgerConfig;
        });
    }