in grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMultiEndpointChannel.java [405:508]
public synchronized void setMultiEndpoints(List<GcpMultiEndpointOptions> meOptions) {
Preconditions.checkNotNull(meOptions);
Preconditions.checkArgument(!meOptions.isEmpty(), "MultiEndpoints list is empty");
Set<String> currentMultiEndpoints = new HashSet<>();
// Must have all multiendpoints before initializing the pools so that all multiendpoints
// can get status update of every pool.
meOptions.forEach(
options -> {
currentMultiEndpoints.add(options.getName());
// Create or update MultiEndpoint
MultiEndpoint existingMe = multiEndpoints.get(options.getName());
if (existingMe != null) {
updateMetricsForMultiEndpoint(options, existingMe);
existingMe.setEndpoints(options.getEndpoints());
} else {
MultiEndpoint me =
new MultiEndpoint.Builder(options.getEndpoints())
.withRecoveryTimeout(options.getRecoveryTimeout())
.withSwitchingDelay(options.getSwitchingDelay())
.build();
setUpMetricsForMultiEndpoint(options, me);
multiEndpoints.put(options.getName(), me);
}
});
final Set<String> existingPools = new HashSet<>(pools.keySet());
currentEndpoints.clear();
// TODO: Support the same endpoint in different MultiEndpoint to use different channel
// credentials.
// TODO: Support different endpoints in the same MultiEndpoint to use different channel
// credentials.
meOptions.forEach(
options -> {
// Create missing pools
options
.getEndpoints()
.forEach(
endpoint -> {
currentEndpoints.add(endpoint);
pools.computeIfAbsent(
endpoint,
e -> {
ManagedChannelBuilder<?> managedChannelBuilder;
if (options.getChannelCredentials() != null) {
managedChannelBuilder =
Grpc.newChannelBuilder(e, options.getChannelCredentials());
} else {
managedChannelBuilder = channelBuilderForEndpoint(e);
}
if (options.getChannelConfigurator() != null) {
managedChannelBuilder =
options.getChannelConfigurator().apply(managedChannelBuilder);
}
GcpManagedChannel channel =
new GcpManagedChannel(
managedChannelBuilder,
apiConfig,
// Add endpoint to metric labels.
prepareGcpManagedChannelConfig(gcpManagedChannelOptions, e));
// Start monitoring the pool state.
new EndpointStateMonitor(channel, e);
return channel;
});
});
});
existingPools.retainAll(currentEndpoints);
existingPools.forEach(
e -> {
// Communicate current state to MultiEndpoints.
checkPoolState(pools.get(e), e);
});
defaultMultiEndpoint = multiEndpoints.get(meOptions.get(0).getName());
// Remove obsolete multiendpoints.
Iterator<String> iter = multiEndpoints.keySet().iterator();
while (iter.hasNext()) {
String name = iter.next();
if (currentMultiEndpoints.contains(name)) {
continue;
}
removeMetricsForMultiEndpoint(name, multiEndpoints.get(name));
iter.remove();
}
// Shutdown and remove the pools not present in options.
final Set<String> poolsToRemove = new HashSet<>(pools.keySet());
poolsToRemove.removeIf(currentEndpoints::contains);
if (!poolsToRemove.isEmpty()) {
// Get max switching delay.
Optional<Duration> maxDelay =
meOptions
.stream()
.map(GcpMultiEndpointOptions::getSwitchingDelay)
.max(Comparator.naturalOrder());
if (maxDelay.isPresent() && maxDelay.get().toMillis() > 0) {
executor.schedule(
() -> maybeCleanupPools(poolsToRemove), maxDelay.get().toMillis(), MILLISECONDS);
} else {
maybeCleanupPools(poolsToRemove);
}
}
}