public EnsemblePlacementPolicy initialize()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java [181:269]


    public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
            Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
            StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
        this.statsLogger = statsLogger;
        this.bookieAddressResolver = bookieAddressResolver;
        this.timer = timer;
        this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BOOKIES_JOINED);
        this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
        this.failedToResolveNetworkLocationCounter = statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNT);
        this.numWritableBookiesInDefaultFaultDomain = new Gauge<Integer>() {
            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                rwLock.readLock().lock();
                try {
                    return topology.countNumOfAvailableNodes(getDefaultFaultDomain(), Collections.emptySet());
                } finally {
                    rwLock.readLock().unlock();
                }
            }
        };
        this.statsLogger.registerGauge(NUM_WRITABLE_BOOKIES_IN_DEFAULT_FAULTDOMAIN,
                numWritableBookiesInDefaultFaultDomain);
        this.reorderThresholdPendingRequests = conf.getReorderThresholdPendingRequests();
        this.isWeighted = conf.getDiskWeightBasedPlacementEnabled();
        if (this.isWeighted) {
            this.maxWeightMultiple = conf.getBookieMaxWeightMultipleForWeightBasedPlacement();
            this.weightedSelection = new DynamicWeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
            LOG.info("Weight based placement with max multiple of {}", this.maxWeightMultiple);
        } else {
            LOG.info("Not weighted");
        }
        this.minNumZonesPerWriteQuorum = conf.getMinNumZonesPerWriteQuorum();
        this.desiredNumZonesPerWriteQuorum = conf.getDesiredNumZonesPerWriteQuorum();
        this.enforceStrictZoneawarePlacement = conf.getEnforceStrictZoneawarePlacement();
        if (minNumZonesPerWriteQuorum > desiredNumZonesPerWriteQuorum) {
            LOG.error(
                    "It is misconfigured, for ZoneawareEnsemblePlacementPolicy, minNumZonesPerWriteQuorum: {} cann't be"
                            + " greater than desiredNumZonesPerWriteQuorum: {}",
                    minNumZonesPerWriteQuorum, desiredNumZonesPerWriteQuorum);
            throw new IllegalArgumentException("minNumZonesPerWriteQuorum: " + minNumZonesPerWriteQuorum
                    + " cann't be greater than desiredNumZonesPerWriteQuorum: " + desiredNumZonesPerWriteQuorum);
        }
        DNSToSwitchMapping actualDNSResolver;
        if (optionalDnsResolver.isPresent()) {
            actualDNSResolver = optionalDnsResolver.get();
        } else {
            String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
            actualDNSResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
            actualDNSResolver.setBookieAddressResolver(bookieAddressResolver);
            if (actualDNSResolver instanceof Configurable) {
                ((Configurable) actualDNSResolver).setConf(conf);
            }
        }

        this.dnsResolver = new DNSResolverDecorator(actualDNSResolver, () -> this.getDefaultFaultDomain(),
                failedToResolveNetworkLocationCounter);
        dnsResolver.setBookieAddressResolver(bookieAddressResolver);
        this.stabilizePeriodSeconds = conf.getNetworkTopologyStabilizePeriodSeconds();
        // create the network topology
        if (stabilizePeriodSeconds > 0) {
            this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds);
        } else {
            this.topology = new NetworkTopologyImpl();
        }
        try {
            myNode = createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress());
            myZone = getZoneAwareNodeLocation(myNode).getZone();
        } catch (IOException e) {
            LOG.error("Failed to get local host address : ", e);
            throw new RuntimeException(e);
        }
        LOG.info("Initialized zoneaware ensemble placement policy @ {} @ {} : {}.", myNode,
                myNode.getNetworkLocation(), dnsResolver.getClass().getName());

        slowBookies = CacheBuilder.newBuilder()
                .expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
                .build(new CacheLoader<BookieId, Long>() {
                    @Override
                    public Long load(BookieId key) throws Exception {
                        return -1L;
                    }
                });
        return this;
    }