in src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java [263:323]
long sendPartitionedQuery(
IntermediateStatement describedResult,
QueryMode mode,
BatchTransactionId batchTransactionId,
List<Partition> partitions) {
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Math.min(8 * Runtime.getRuntime().availableProcessors(), partitions.size())));
List<ListenableFuture<Long>> futures = new ArrayList<>(partitions.size());
Connection spannerConnection = connection.getSpannerConnection();
Spanner spanner = spannerConnection.getSpanner();
BatchClient batchClient = spanner.getBatchClient(connection.getDatabaseId());
BatchReadOnlyTransaction batchReadOnlyTransaction =
batchClient.batchReadOnlyTransaction(batchTransactionId);
Context context =
Context.current()
.withValue(
SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
new SpannerOptions.CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
return GrpcCallContext.createDefault().withTimeout(Duration.ofHours(24L));
}
});
CountDownLatch binaryCopyHeaderSentLatch =
describedResult instanceof CopyToStatement && ((CopyToStatement) describedResult).isBinary()
? new CountDownLatch(1)
: new CountDownLatch(0);
for (Partition partition : partitions) {
futures.add(
executorService.submit(
context.wrap(
SendResultSetRunnable.forPartition(
describedResult,
batchReadOnlyTransaction,
partition,
mode,
binaryCopyHeaderSentLatch))));
}
executorService.shutdown();
try {
List<Long> rowCounts = Futures.allAsList(futures).get();
long rowCount = rowCounts.stream().reduce(Long::sum).orElse(0L);
logger.log(Level.INFO, String.format("Sent %d rows from partitioned query", rowCount));
return rowCount;
} catch (ExecutionException executionException) {
logger.log(
Level.WARNING, "Sending partitioned query result failed", executionException.getCause());
executorService.shutdownNow();
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
logger.log(
Level.WARNING, "Sending partitioned query result interrupted", interruptedException);
executorService.shutdownNow();
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
} finally {
batchReadOnlyTransaction.cleanup();
}
}