public List getStreams()

in flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java [130:215]


  public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
      throws SQLException {
    final ArrayList<CloseableEndpointStreamPair> endpoints =
        new ArrayList<>(flightInfo.getEndpoints().size());

    try {
      for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
        if (endpoint.getLocations().isEmpty()) {
          // Create a stream using the current client only and do not close the client at the end.
          endpoints.add(
              new CloseableEndpointStreamPair(
                  sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
        } else {
          // Clone the builder and then set the new endpoint on it.

          // GH-38574: Currently a new FlightClient will be made for each partition that returns a
          // non-empty Location then disposed of. It may be better to cache clients because a server
          // may report the same Locations. It would also be good to identify when the reported
          // location
          // is the same as the original connection's Location and skip creating a FlightClient in
          // that scenario.
          List<Exception> exceptions = new ArrayList<>();
          CloseableEndpointStreamPair stream = null;
          for (Location location : endpoint.getLocations()) {
            final URI endpointUri = location.getUri();
            if (endpointUri.getScheme().equals(LocationSchemes.REUSE_CONNECTION)) {
              stream =
                  new CloseableEndpointStreamPair(
                      sqlClient.getStream(endpoint.getTicket(), getOptions()), null);
              break;
            }
            final Builder builderForEndpoint =
                new Builder(ArrowFlightSqlClientHandler.this.builder)
                    .withHost(endpointUri.getHost())
                    .withPort(endpointUri.getPort())
                    .withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS))
                    .withConnectTimeout(builder.connectTimeout);

            ArrowFlightSqlClientHandler endpointHandler = null;
            try {
              endpointHandler = builderForEndpoint.build();
              stream =
                  new CloseableEndpointStreamPair(
                      endpointHandler.sqlClient.getStream(
                          endpoint.getTicket(), endpointHandler.getOptions()),
                      endpointHandler.sqlClient);
              // Make sure we actually get data from the server
              stream.getStream().getSchema();
            } catch (Exception ex) {
              if (endpointHandler != null) {
                AutoCloseables.close(endpointHandler);
              }
              exceptions.add(ex);
              continue;
            }

            break;
          }
          if (stream != null) {
            endpoints.add(stream);
          } else if (exceptions.isEmpty()) {
            // This should never happen...
            throw new IllegalStateException("Could not connect to endpoint and no errors occurred");
          } else {
            Exception ex = exceptions.remove(0);
            while (!exceptions.isEmpty()) {
              ex.addSuppressed(exceptions.remove(exceptions.size() - 1));
            }
            throw ex;
          }
        }
      }
    } catch (Exception outerException) {
      try {
        AutoCloseables.close(endpoints);
      } catch (Exception innerEx) {
        outerException.addSuppressed(innerEx);
      }

      if (outerException instanceof SQLException) {
        throw (SQLException) outerException;
      }
      throw new SQLException(outerException);
    }
    return endpoints;
  }