in src/main/java/com/google/cloud/spanner/pgadapter/utils/MutationWriter.java [237:372]
public StatementResult call() throws Exception {
PipedInputStream inputStream = new PipedInputStream(payload, copySettings.getPipeBufferSize());
pipeCreatedLatch.countDown();
final CopyInParser parser =
CopyInParser.create(
copySettings.getSessionState(), copyFormat, csvFormat, inputStream, hasHeader);
// This LinkedBlockingDeque holds a reference to all transactions that are currently active. The
// max capacity of this deque is what ensures that we never have more than maxParallelism
// transactions running at the same time. We could also achieve that by using a thread pool with
// a fixed number of threads. The problem with that is however that Java does not have a thread
// pool implementation that will block if a new task is offered and all threads are currently in
// use. The only options are 'fail or add to queue'. We want to block our worker thread in this
// case when the max parallel transactions has been reached, as that automatically creates back-
// pressure in our entire pipeline that consists of:
// Client app (psql) -> CopyData message -> CSVParser -> Transaction.
LinkedBlockingDeque<ApiFuture<Void>> activeCommitFutures =
new LinkedBlockingDeque<>(copySettings.getMaxParallelism());
// This list holds all transactions that we have started. We will wait on this entire list
// before finishing, to ensure that all data has been written before we signal that we are done.
List<ApiFuture<Void>> allCommitFutures = new ArrayList<>();
try {
// Wait until we know whether we actually will receive any data. It could be that it is an
// empty copy operation, and we should then end early.
dataReceivedLatch.await();
Iterator<CopyRecord> iterator = parser.iterator();
List<Mutation> mutations = new ArrayList<>();
long currentBufferByteSize = 0L;
// Note: iterator.hasNext() blocks if there is not enough data in the pipeline to construct a
// complete record. It returns false if the stream has been closed and all records have been
// returned.
while (bytesReceived.get() > 0L && !rollback.get() && iterator.hasNext()) {
CopyRecord record = iterator.next();
if (record.isEndRecord()) {
break;
}
if (record.numColumns() != this.tableColumns.keySet().size()) {
throw PGExceptionFactory.newPGException(
String.format(
"Invalid COPY data: Row length mismatch. Expected %d values, but got %d.",
this.tableColumns.keySet().size(), record.numColumns()),
SQLState.DataException);
}
Mutation mutation = buildMutation(record);
int mutationSize = calculateSize(mutation);
this.rowCount++;
if (transactionMode == CopyTransactionMode.ImplicitNonAtomic) {
currentBufferByteSize =
addMutationAndMaybeFlushTransaction(
activeCommitFutures,
allCommitFutures,
mutations,
mutation,
currentBufferByteSize,
mutationSize);
} else {
mutations.add(mutation);
currentBufferByteSize += mutationSize;
if (mutations.size() > maxAtomicBatchSize) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Record count: "
+ mutations.size()
+ " has exceeded the limit: "
+ maxAtomicBatchSize
+ ".\n\nThe number of mutations per record is equal to the number of columns in the record "
+ "plus the number of indexed columns in the record. The maximum number of mutations "
+ "in one transaction is "
+ copySettings.getMaxAtomicMutationsLimit()
+ ".\n\nExecute `SET SPANNER.AUTOCOMMIT_DML_MODE='PARTITIONED_NON_ATOMIC'` before executing a large COPY operation "
+ "to instruct PGAdapter to automatically break large transactions into multiple smaller. "
+ "This will make the COPY operation non-atomic.\n\n");
}
if (currentBufferByteSize > copySettings.getMaxAtomicCommitSize()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Commit size: "
+ currentBufferByteSize
+ " has exceeded the limit: "
+ copySettings.getMaxAtomicCommitSize()
+ ".\n\nExecute `SET SPANNER.AUTOCOMMIT_DML_MODE='PARTITIONED_NON_ATOMIC'` before executing a large COPY operation "
+ "to instruct PGAdapter to automatically break large transactions into multiple smaller. "
+ "This will make the COPY operation non-atomic.\n\n");
}
}
} // end of iterator.hasNext()
// There are no more CSVRecords in the pipeline.
// Write any remaining mutations in the buffer.
if (!rollback.get() && !mutations.isEmpty()) {
if (transactionMode == CopyTransactionMode.Explicit) {
connection.write(mutations);
} else {
// Wait until we have received a CopyDone message before writing the remaining data to
// Spanner. If we are in a non-atomic transaction, there might already be data that have
// been written to Spanner.
closedLatch.await();
if (commit.get()) {
allCommitFutures.add(writeToSpannerAsync(activeCommitFutures, mutations));
}
}
}
// Wait for all commits to finish. We do this even if something went wrong, as it ensures two
// things:
// 1. All commits that were in flight when something went wrong will finish before we return
// an error to the client application. This prevents commits still being added to the
// database after we have returned an error, which could cause confusion.
// 2. This will throw the underlying exception, so we can catch and register it.
ApiFutures.allAsList(allCommitFutures).get();
} catch (SpannerException e) {
synchronized (lock) {
this.exception = PGExceptionFactory.toPGException(e);
throw this.exception;
}
} catch (ExecutionException e) {
synchronized (lock) {
this.exception = PGExceptionFactory.toPGException(e.getCause());
throw this.exception;
}
} catch (Exception e) {
synchronized (lock) {
this.exception = PGExceptionFactory.toPGException(e);
throw this.exception;
}
} finally {
this.executorService.shutdown();
if (!this.executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "Timeout while waiting for MutationWriter executor to shutdown.");
}
this.payload.close();
parser.close();
}
return new UpdateCount(rowCount);
}