long sendPartitionedQuery()

in src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java [263:323]


  long sendPartitionedQuery(
      IntermediateStatement describedResult,
      QueryMode mode,
      BatchTransactionId batchTransactionId,
      List<Partition> partitions) {
    ListeningExecutorService executorService =
        MoreExecutors.listeningDecorator(
            Executors.newFixedThreadPool(
                Math.min(8 * Runtime.getRuntime().availableProcessors(), partitions.size())));
    List<ListenableFuture<Long>> futures = new ArrayList<>(partitions.size());
    Connection spannerConnection = connection.getSpannerConnection();
    Spanner spanner = spannerConnection.getSpanner();
    BatchClient batchClient = spanner.getBatchClient(connection.getDatabaseId());
    BatchReadOnlyTransaction batchReadOnlyTransaction =
        batchClient.batchReadOnlyTransaction(batchTransactionId);
    Context context =
        Context.current()
            .withValue(
                SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
                new SpannerOptions.CallContextConfigurator() {
                  @Override
                  public <ReqT, RespT> ApiCallContext configure(
                      ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
                    return GrpcCallContext.createDefault().withTimeout(Duration.ofHours(24L));
                  }
                });
    CountDownLatch binaryCopyHeaderSentLatch =
        describedResult instanceof CopyToStatement && ((CopyToStatement) describedResult).isBinary()
            ? new CountDownLatch(1)
            : new CountDownLatch(0);
    for (Partition partition : partitions) {
      futures.add(
          executorService.submit(
              context.wrap(
                  SendResultSetRunnable.forPartition(
                      describedResult,
                      batchReadOnlyTransaction,
                      partition,
                      mode,
                      binaryCopyHeaderSentLatch))));
    }
    executorService.shutdown();
    try {
      List<Long> rowCounts = Futures.allAsList(futures).get();
      long rowCount = rowCounts.stream().reduce(Long::sum).orElse(0L);
      logger.log(Level.INFO, String.format("Sent %d rows from partitioned query", rowCount));
      return rowCount;
    } catch (ExecutionException executionException) {
      logger.log(
          Level.WARNING, "Sending partitioned query result failed", executionException.getCause());
      executorService.shutdownNow();
      throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
    } catch (InterruptedException interruptedException) {
      logger.log(
          Level.WARNING, "Sending partitioned query result interrupted", interruptedException);
      executorService.shutdownNow();
      throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
    } finally {
      batchReadOnlyTransaction.cleanup();
    }
  }