in src/main/java/com/google/cloud/spanner/pgadapter/statements/BackendConnection.java [1482:1568]
int executeStatementsInBatch(int fromIndex) {
Preconditions.checkArgument(fromIndex < getStatementCount() - 1);
Preconditions.checkArgument(
canBeBatchedTogether(getStatementType(fromIndex), getStatementType(fromIndex + 1)));
Span span = createSpan("execute_batch", null);
try (Scope ignore = span.makeCurrent()) {
statementContext.push(Context.current());
StatementType batchType = getStatementType(fromIndex);
if (batchType == StatementType.UPDATE) {
spannerConnection.startBatchDml();
} else if (batchType == StatementType.DDL) {
spannerConnection.startBatchDdl();
}
List<StatementResult> statementResults = new ArrayList<>(getStatementCount());
int index = fromIndex;
try {
while (index < getStatementCount()) {
if (!bufferedStatements.get(index).isBatchingPossible()) {
break;
}
StatementType statementType = getStatementType(index);
if (canBeBatchedTogether(batchType, statementType)) {
// Send DDL statements to the DdlExecutor instead of executing them directly on the
// connection, so we can support certain DDL constructs that are currently not supported
// by the backend, such as IF [NOT] EXISTS.
if (batchType == StatementType.DDL) {
statementResults.add(
ddlExecutor.execute(
bufferedStatements.get(index).parsedStatement,
bufferedStatements.get(index).statement));
} else {
Execute execute = (Execute) bufferedStatements.get(index);
execute.analyzeOrExecute(execute.bindStatement(execute.statement, null));
}
index++;
} else {
// End the batch here, as the statement type on this index can not be batched together
// with the other statements in the batch.
break;
}
}
} catch (Exception exception) {
// This should normally not happen, as we are not sending any statements to Cloud Spanner
// yet,
// but is done as safety precaution to ensure that there is always at least one result.
// Register the exception on the first statement in the batch.
bufferedStatements.get(fromIndex).result.setException(exception);
throw exception;
}
Span runBatchSpan = createSpan("execute_batch_on_spanner", null);
try (Scope ignoreRunBatchSpan = runBatchSpan.makeCurrent()) {
Stopwatch stopwatch = Stopwatch.createStarted();
long[] counts = spannerConnection.runBatch();
Duration executionDuration = stopwatch.elapsed();
metrics.recordClientLibLatency(executionDuration.toMillis(), metricAttributes);
if (batchType == StatementType.DDL) {
counts = extractDdlUpdateCounts(statementResults, counts);
}
updateBatchResultCount(fromIndex, counts);
} catch (SpannerBatchUpdateException batchUpdateException) {
long[] counts;
if (batchType == StatementType.DDL) {
counts = extractDdlUpdateCounts(statementResults, batchUpdateException.getUpdateCounts());
} else {
counts = batchUpdateException.getUpdateCounts();
}
updateBatchResultCount(fromIndex, counts);
Execute failedExecute = (Execute) bufferedStatements.get(fromIndex + counts.length);
failedExecute.result.setException(batchUpdateException);
runBatchSpan.recordException(batchUpdateException);
throw batchUpdateException;
} catch (Throwable exception) {
bufferedStatements.get(fromIndex).result.setException(exception);
runBatchSpan.recordException(exception);
throw exception;
} finally {
runBatchSpan.end();
}
return index - fromIndex;
} catch (Throwable throwable) {
span.recordException(throwable);
throw throwable;
} finally {
span.end();
statementContext.pop();
}
}