in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java [181:269]
public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
this.statsLogger = statsLogger;
this.bookieAddressResolver = bookieAddressResolver;
this.timer = timer;
this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BOOKIES_JOINED);
this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
this.failedToResolveNetworkLocationCounter = statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNT);
this.numWritableBookiesInDefaultFaultDomain = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
rwLock.readLock().lock();
try {
return topology.countNumOfAvailableNodes(getDefaultFaultDomain(), Collections.emptySet());
} finally {
rwLock.readLock().unlock();
}
}
};
this.statsLogger.registerGauge(NUM_WRITABLE_BOOKIES_IN_DEFAULT_FAULTDOMAIN,
numWritableBookiesInDefaultFaultDomain);
this.reorderThresholdPendingRequests = conf.getReorderThresholdPendingRequests();
this.isWeighted = conf.getDiskWeightBasedPlacementEnabled();
if (this.isWeighted) {
this.maxWeightMultiple = conf.getBookieMaxWeightMultipleForWeightBasedPlacement();
this.weightedSelection = new DynamicWeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
LOG.info("Weight based placement with max multiple of {}", this.maxWeightMultiple);
} else {
LOG.info("Not weighted");
}
this.minNumZonesPerWriteQuorum = conf.getMinNumZonesPerWriteQuorum();
this.desiredNumZonesPerWriteQuorum = conf.getDesiredNumZonesPerWriteQuorum();
this.enforceStrictZoneawarePlacement = conf.getEnforceStrictZoneawarePlacement();
if (minNumZonesPerWriteQuorum > desiredNumZonesPerWriteQuorum) {
LOG.error(
"It is misconfigured, for ZoneawareEnsemblePlacementPolicy, minNumZonesPerWriteQuorum: {} cann't be"
+ " greater than desiredNumZonesPerWriteQuorum: {}",
minNumZonesPerWriteQuorum, desiredNumZonesPerWriteQuorum);
throw new IllegalArgumentException("minNumZonesPerWriteQuorum: " + minNumZonesPerWriteQuorum
+ " cann't be greater than desiredNumZonesPerWriteQuorum: " + desiredNumZonesPerWriteQuorum);
}
DNSToSwitchMapping actualDNSResolver;
if (optionalDnsResolver.isPresent()) {
actualDNSResolver = optionalDnsResolver.get();
} else {
String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
actualDNSResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
actualDNSResolver.setBookieAddressResolver(bookieAddressResolver);
if (actualDNSResolver instanceof Configurable) {
((Configurable) actualDNSResolver).setConf(conf);
}
}
this.dnsResolver = new DNSResolverDecorator(actualDNSResolver, () -> this.getDefaultFaultDomain(),
failedToResolveNetworkLocationCounter);
dnsResolver.setBookieAddressResolver(bookieAddressResolver);
this.stabilizePeriodSeconds = conf.getNetworkTopologyStabilizePeriodSeconds();
// create the network topology
if (stabilizePeriodSeconds > 0) {
this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds);
} else {
this.topology = new NetworkTopologyImpl();
}
try {
myNode = createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress());
myZone = getZoneAwareNodeLocation(myNode).getZone();
} catch (IOException e) {
LOG.error("Failed to get local host address : ", e);
throw new RuntimeException(e);
}
LOG.info("Initialized zoneaware ensemble placement policy @ {} @ {} : {}.", myNode,
myNode.getNetworkLocation(), dnsResolver.getClass().getName());
slowBookies = CacheBuilder.newBuilder()
.expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
.build(new CacheLoader<BookieId, Long>() {
@Override
public Long load(BookieId key) throws Exception {
return -1L;
}
});
return this;
}