wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java (333 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.jdbc.plugin.failover;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import software.amazon.jdbc.HostRole;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.PropertyUtils;
import software.amazon.jdbc.util.Utils;
/**
* An implementation of ReaderFailoverHandler.
*
* <p>Reader Failover Process goal is to connect to any available reader. In order to connect
* faster, this implementation tries to connect to two readers at the same time. The first
* successfully connected reader is returned as the process result. If both readers are unavailable
* (i.e. could not be connected to), the process picks up another pair of readers and repeat. If no
* reader has been connected to, the process may consider a writer host, and other hosts marked
* down, to connect to.
*/
public class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler {
private static final Logger LOGGER =
Logger.getLogger(ClusterAwareReaderFailoverHandler.class.getName());
protected static final int DEFAULT_FAILOVER_TIMEOUT = 60000; // 60 sec
protected static final int DEFAULT_READER_CONNECT_TIMEOUT = 30000; // 30 sec
public static final ReaderFailoverResult FAILED_READER_FAILOVER_RESULT =
new ReaderFailoverResult(null, null, false);
protected Properties initialConnectionProps;
protected int maxFailoverTimeoutMs;
protected int timeoutMs;
protected boolean isStrictReaderRequired;
protected final PluginService pluginService;
/**
* ClusterAwareReaderFailoverHandler constructor.
*
* @param pluginService A provider for creating new connections.
* @param initialConnectionProps The initial connection properties to copy over to the new reader.
*/
public ClusterAwareReaderFailoverHandler(
final PluginService pluginService,
final Properties initialConnectionProps) {
this(
pluginService,
initialConnectionProps,
DEFAULT_FAILOVER_TIMEOUT,
DEFAULT_READER_CONNECT_TIMEOUT,
false);
}
/**
* ClusterAwareReaderFailoverHandler constructor.
*
* @param pluginService A provider for creating new connections.
* @param initialConnectionProps The initial connection properties to copy over to the new reader.
* @param maxFailoverTimeoutMs Maximum allowed time for the entire reader failover process.
* @param timeoutMs Maximum allowed time in milliseconds for each reader connection attempt during
* the reader failover process.
* @param isStrictReaderRequired When true, it disables adding a writer to a list of nodes to connect
*/
public ClusterAwareReaderFailoverHandler(
final PluginService pluginService,
final Properties initialConnectionProps,
final int maxFailoverTimeoutMs,
final int timeoutMs,
final boolean isStrictReaderRequired) {
this.pluginService = pluginService;
this.initialConnectionProps = initialConnectionProps;
this.maxFailoverTimeoutMs = maxFailoverTimeoutMs;
this.timeoutMs = timeoutMs;
this.isStrictReaderRequired = isStrictReaderRequired;
}
/**
* Set process timeout in millis. Entire process of connecting to a reader will be limited by this
* time duration.
*
* @param timeoutMs Process timeout in millis
*/
protected void setTimeoutMs(final int timeoutMs) {
this.timeoutMs = timeoutMs;
}
/**
* Called to start Reader Failover Process. This process tries to connect to any reader. If no
* reader is available then driver may also try to connect to a writer host, down hosts, and the
* current reader host.
*
* @param hosts Cluster current topology
* @param currentHost The currently connected host that has failed.
* @return {@link ReaderFailoverResult} The results of this process.
*/
@Override
public ReaderFailoverResult failover(final List<HostSpec> hosts, final HostSpec currentHost)
throws SQLException {
if (Utils.isNullOrEmpty(hosts)) {
LOGGER.fine(() -> Messages.get("ClusterAwareReaderFailoverHandler.invalidTopology", new Object[] {"failover"}));
return FAILED_READER_FAILOVER_RESULT;
}
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<ReaderFailoverResult> future = submitInternalFailoverTask(hosts, currentHost, executor);
return getInternalFailoverResult(executor, future);
}
private Future<ReaderFailoverResult> submitInternalFailoverTask(
final List<HostSpec> hosts,
final HostSpec currentHost,
final ExecutorService executor) {
final Future<ReaderFailoverResult> future = executor.submit(() -> {
ReaderFailoverResult result;
try {
while (true) {
result = failoverInternal(hosts, currentHost);
if (result != null && result.isConnected()) {
return result;
}
TimeUnit.SECONDS.sleep(1);
}
} catch (final SQLException ex) {
return new ReaderFailoverResult(null, null, false, ex);
} catch (final Exception ex) {
return new ReaderFailoverResult(null, null, false, new SQLException(ex));
}
});
executor.shutdown();
return future;
}
private ReaderFailoverResult getInternalFailoverResult(
final ExecutorService executor,
final Future<ReaderFailoverResult> future) throws SQLException {
try {
final ReaderFailoverResult result = future.get(this.maxFailoverTimeoutMs, TimeUnit.MILLISECONDS);
if (result == null) {
LOGGER.warning(
Messages.get("ClusterAwareReaderFailoverHandler.timeout", new Object[] {this.maxFailoverTimeoutMs}));
return FAILED_READER_FAILOVER_RESULT;
}
return result;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(Messages.get("ClusterAwareReaderFailoverHandler.interruptedThread"), "70100", e);
} catch (final ExecutionException e) {
return FAILED_READER_FAILOVER_RESULT;
} catch (final TimeoutException e) {
future.cancel(true);
return FAILED_READER_FAILOVER_RESULT;
} finally {
if (!executor.isTerminated()) {
executor.shutdownNow(); // terminate all remaining tasks
}
}
}
protected ReaderFailoverResult failoverInternal(
final List<HostSpec> hosts,
final HostSpec currentHost)
throws SQLException {
if (currentHost != null) {
this.pluginService.setAvailability(currentHost.asAliases(), HostAvailability.NOT_AVAILABLE);
}
final List<HostSpec> hostsByPriority = getHostsByPriority(hosts);
return getConnectionFromHostGroup(hostsByPriority);
}
public List<HostSpec> getHostsByPriority(final List<HostSpec> hosts) {
final List<HostSpec> activeReaders = new ArrayList<>();
final List<HostSpec> downHostList = new ArrayList<>();
HostSpec writerHost = null;
for (final HostSpec host : hosts) {
if (host.getRole() == HostRole.WRITER) {
writerHost = host;
continue;
}
if (host.getRawAvailability() == HostAvailability.AVAILABLE) {
activeReaders.add(host);
} else {
downHostList.add(host);
}
}
Collections.shuffle(activeReaders);
Collections.shuffle(downHostList);
final List<HostSpec> hostsByPriority = new ArrayList<>(activeReaders);
final int numOfReaders = activeReaders.size() + downHostList.size();
// Since the writer instance may change during failover, the original writer is likely now a reader. We will include
// it and then verify the role once connected if using "strict-reader".
if (writerHost != null || numOfReaders == 0) {
hostsByPriority.add(writerHost);
}
hostsByPriority.addAll(downHostList);
return hostsByPriority;
}
/**
* Called to get any available reader connection. If no reader is available then result of process
* is unsuccessful. This process will not attempt to connect to the writer.
*
* @param hostList Cluster current topology
* @return {@link ReaderFailoverResult} The results of this process.
*/
@Override
public ReaderFailoverResult getReaderConnection(final List<HostSpec> hostList)
throws SQLException {
if (Utils.isNullOrEmpty(hostList)) {
LOGGER.fine(
() -> Messages.get(
"ClusterAwareReaderFailover.invalidTopology",
new Object[] {"getReaderConnection"}));
return FAILED_READER_FAILOVER_RESULT;
}
final List<HostSpec> hostsByPriority = getReaderHostsByPriority(hostList);
return getConnectionFromHostGroup(hostsByPriority);
}
public List<HostSpec> getReaderHostsByPriority(final List<HostSpec> hosts) {
final List<HostSpec> activeReaders = new ArrayList<>();
final List<HostSpec> downHostList = new ArrayList<>();
HostSpec writerHost = null;
for (final HostSpec host : hosts) {
if (host.getRole() == HostRole.WRITER) {
writerHost = host;
continue;
}
if (host.getRawAvailability() == HostAvailability.AVAILABLE) {
activeReaders.add(host);
} else {
downHostList.add(host);
}
}
Collections.shuffle(activeReaders);
Collections.shuffle(downHostList);
final List<HostSpec> hostsByPriority = new ArrayList<>();
hostsByPriority.addAll(activeReaders);
hostsByPriority.addAll(downHostList);
final int numOfReaders = activeReaders.size() + downHostList.size();
if (writerHost != null && (numOfReaders == 0
|| this.pluginService.getDialect().getFailoverRestrictions()
.contains(FailoverRestriction.ENABLE_WRITER_IN_TASK_B))) {
hostsByPriority.add(writerHost);
}
return hostsByPriority;
}
private ReaderFailoverResult getConnectionFromHostGroup(final List<HostSpec> hosts)
throws SQLException {
final ExecutorService executor = Executors.newFixedThreadPool(2);
final CompletionService<ReaderFailoverResult> completionService = new ExecutorCompletionService<>(executor);
try {
for (int i = 0; i < hosts.size(); i += 2) {
// submit connection attempt tasks in batches of 2
final ReaderFailoverResult result = getResultFromNextTaskBatch(hosts, executor, completionService, i);
if (result.isConnected() || result.getException() != null) {
return result;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(Messages.get("ClusterAwareReaderFailoverHandler.interruptedThread"), "70100", e);
}
}
return new ReaderFailoverResult(
null,
null,
false);
} finally {
executor.shutdownNow();
}
}
private ReaderFailoverResult getResultFromNextTaskBatch(
final List<HostSpec> hosts,
final ExecutorService executor,
final CompletionService<ReaderFailoverResult> completionService,
final int i) throws SQLException {
ReaderFailoverResult result;
final int numTasks = i + 1 < hosts.size() ? 2 : 1;
completionService.submit(new ConnectionAttemptTask(hosts.get(i), this.isStrictReaderRequired));
if (numTasks == 2) {
completionService.submit(new ConnectionAttemptTask(hosts.get(i + 1), this.isStrictReaderRequired));
}
for (int taskNum = 0; taskNum < numTasks; taskNum++) {
result = getNextResult(completionService);
if (result.isConnected()) {
executor.shutdownNow();
return result;
}
if (result.getException() != null) {
executor.shutdownNow();
return result;
}
}
return new ReaderFailoverResult(
null,
null,
false);
}
private ReaderFailoverResult getNextResult(final CompletionService<ReaderFailoverResult> service)
throws SQLException {
try {
final Future<ReaderFailoverResult> future = service.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
if (future == null) {
return FAILED_READER_FAILOVER_RESULT;
}
final ReaderFailoverResult result = future.get();
return result == null ? FAILED_READER_FAILOVER_RESULT : result;
} catch (final ExecutionException e) {
return FAILED_READER_FAILOVER_RESULT;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
// "Thread was interrupted"
throw new SQLException(
Messages.get(
"ClusterAwareReaderFailoverHandler.interruptedThread"),
"70100",
e);
}
}
private class ConnectionAttemptTask implements Callable<ReaderFailoverResult> {
private final HostSpec newHost;
private final boolean isStrictReaderRequired;
private ConnectionAttemptTask(final HostSpec newHost, final boolean isStrictReaderRequired) {
this.newHost = newHost;
this.isStrictReaderRequired = isStrictReaderRequired;
}
/**
* Call ConnectionAttemptResult.
*/
@Override
public ReaderFailoverResult call() {
LOGGER.fine(
() -> Messages.get(
"ClusterAwareReaderFailoverHandler.attemptingReaderConnection",
new Object[] {this.newHost.getUrl(), PropertyUtils.maskProperties(initialConnectionProps)}));
try {
final Properties copy = new Properties();
copy.putAll(initialConnectionProps);
final Connection conn = pluginService.forceConnect(this.newHost, copy);
pluginService.setAvailability(this.newHost.asAliases(), HostAvailability.AVAILABLE);
if (this.isStrictReaderRequired) {
// need to ensure that new connection is a connection to a reader node
try {
HostRole role = pluginService.getHostRole(conn);
if (!HostRole.READER.equals(role)) {
LOGGER.fine(
Messages.get(
"ClusterAwareReaderFailoverHandler.readerRequired",
new Object[]{ this.newHost.getUrl(), role }));
try {
conn.close();
} catch (final SQLException innerException) {
// ignore
}
return FAILED_READER_FAILOVER_RESULT;
}
} catch (SQLException e) {
LOGGER.fine(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", new Object[]{e}));
try {
conn.close();
} catch (final SQLException innerException) {
// ignore
}
return FAILED_READER_FAILOVER_RESULT;
}
}
LOGGER.fine(
() -> Messages.get(
"ClusterAwareReaderFailoverHandler.successfulReaderConnection",
new Object[] {this.newHost.getUrl()}));
LOGGER.fine("New reader failover connection object: " + conn);
return new ReaderFailoverResult(conn, this.newHost, true);
} catch (final SQLException e) {
pluginService.setAvailability(newHost.asAliases(), HostAvailability.NOT_AVAILABLE);
LOGGER.fine(
() -> Messages.get(
"ClusterAwareReaderFailoverHandler.failedReaderConnection",
new Object[] {this.newHost.getUrl()}));
// Propagate exceptions that are not caused by network errors.
if (!pluginService.isNetworkException(e)) {
return new ReaderFailoverResult(
null,
null,
false,
e);
}
return FAILED_READER_FAILOVER_RESULT;
}
}
}
}