in flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java [130:215]
public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
throws SQLException {
final ArrayList<CloseableEndpointStreamPair> endpoints =
new ArrayList<>(flightInfo.getEndpoints().size());
try {
for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
if (endpoint.getLocations().isEmpty()) {
// Create a stream using the current client only and do not close the client at the end.
endpoints.add(
new CloseableEndpointStreamPair(
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
} else {
// Clone the builder and then set the new endpoint on it.
// GH-38574: Currently a new FlightClient will be made for each partition that returns a
// non-empty Location then disposed of. It may be better to cache clients because a server
// may report the same Locations. It would also be good to identify when the reported
// location
// is the same as the original connection's Location and skip creating a FlightClient in
// that scenario.
List<Exception> exceptions = new ArrayList<>();
CloseableEndpointStreamPair stream = null;
for (Location location : endpoint.getLocations()) {
final URI endpointUri = location.getUri();
if (endpointUri.getScheme().equals(LocationSchemes.REUSE_CONNECTION)) {
stream =
new CloseableEndpointStreamPair(
sqlClient.getStream(endpoint.getTicket(), getOptions()), null);
break;
}
final Builder builderForEndpoint =
new Builder(ArrowFlightSqlClientHandler.this.builder)
.withHost(endpointUri.getHost())
.withPort(endpointUri.getPort())
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS))
.withConnectTimeout(builder.connectTimeout);
ArrowFlightSqlClientHandler endpointHandler = null;
try {
endpointHandler = builderForEndpoint.build();
stream =
new CloseableEndpointStreamPair(
endpointHandler.sqlClient.getStream(
endpoint.getTicket(), endpointHandler.getOptions()),
endpointHandler.sqlClient);
// Make sure we actually get data from the server
stream.getStream().getSchema();
} catch (Exception ex) {
if (endpointHandler != null) {
AutoCloseables.close(endpointHandler);
}
exceptions.add(ex);
continue;
}
break;
}
if (stream != null) {
endpoints.add(stream);
} else if (exceptions.isEmpty()) {
// This should never happen...
throw new IllegalStateException("Could not connect to endpoint and no errors occurred");
} else {
Exception ex = exceptions.remove(0);
while (!exceptions.isEmpty()) {
ex.addSuppressed(exceptions.remove(exceptions.size() - 1));
}
throw ex;
}
}
}
} catch (Exception outerException) {
try {
AutoCloseables.close(endpoints);
} catch (Exception innerEx) {
outerException.addSuppressed(innerEx);
}
if (outerException instanceof SQLException) {
throw (SQLException) outerException;
}
throw new SQLException(outerException);
}
return endpoints;
}