in src/main/java/org/mariadb/jdbc/internal/protocol/AbstractMultiSend.java [241:385]
private PrepareResult executeBatchStandard(int estimatedParameterCount) throws SQLException {
int totalExecutionNumber = getTotalExecutionNumber();
SQLException exception = null;
BulkStatus status = new BulkStatus();
ComStmtPrepare comStmtPrepare = null;
FutureTask<AsyncMultiReadResult> futureReadTask = null;
int requestNumberByBulk;
int paramCount = estimatedParameterCount;
try {
do {
status.sendEnded = false;
status.sendSubCmdCounter = 0;
requestNumberByBulk =
Math.min(
totalExecutionNumber - status.sendCmdCounter,
protocol.getOptions().useBatchMultiSendNumber);
protocol.changeSocketTcpNoDelay(false); // enable NAGLE algorithm temporary.
// add prepare sub-command
if (readPrepareStmtResult && prepareResult == null) {
comStmtPrepare = new ComStmtPrepare(protocol, sql);
comStmtPrepare.send(writer);
// read prepare result
prepareResult = comStmtPrepare.read(protocol.getReader(), protocol.isEofDeprecated());
statementId = ((ServerPrepareResult) prepareResult).getStatementId();
paramCount = getParamCount();
}
boolean useCurrentThread = false;
for (; status.sendSubCmdCounter < requestNumberByBulk; ) {
sendCmd(writer, results, parametersList, queries, paramCount, status, prepareResult);
status.sendSubCmdCounter++;
status.sendCmdCounter++;
if (useCurrentThread) {
try {
protocol.getResult(results);
} catch (SQLException qex) {
if (((readPrepareStmtResult && prepareResult == null)
|| !protocol.getOptions().continueBatchOnError)) {
throw qex;
} else {
exception = qex;
}
}
} else if (futureReadTask == null) {
try {
futureReadTask =
new FutureTask<>(
new AsyncMultiRead(
comStmtPrepare,
status,
protocol,
false,
this,
paramCount,
results,
parametersList,
queries,
prepareResult));
readScheduler.execute(futureReadTask);
} catch (RejectedExecutionException r) {
useCurrentThread = true;
try {
protocol.getResult(results);
} catch (SQLException qex) {
if (((readPrepareStmtResult && prepareResult == null)
|| !protocol.getOptions().continueBatchOnError)) {
throw qex;
} else {
exception = qex;
}
}
}
}
}
status.sendEnded = true;
if (!useCurrentThread) {
protocol.changeSocketTcpNoDelay(protocol.getOptions().tcpNoDelay);
try {
AsyncMultiReadResult asyncMultiReadResult = futureReadTask.get();
if (binaryProtocol
&& prepareResult == null
&& asyncMultiReadResult.getPrepareResult() != null) {
prepareResult = asyncMultiReadResult.getPrepareResult();
statementId = ((ServerPrepareResult) prepareResult).getStatementId();
paramCount = prepareResult.getParamCount();
}
if (asyncMultiReadResult.getException() != null) {
if (((readPrepareStmtResult && prepareResult == null)
|| !protocol.getOptions().continueBatchOnError)) {
throw asyncMultiReadResult.getException();
} else {
exception = asyncMultiReadResult.getException();
}
}
} catch (ExecutionException executionException) {
if (executionException.getCause() == null) {
throw new SQLException("Error reading results " + executionException.getMessage());
}
throw new SQLException(
"Error reading results " + executionException.getCause().getMessage());
} catch (InterruptedException interruptedException) {
protocol.setActiveFutureTask(futureReadTask);
Thread.currentThread().interrupt();
throw new SQLException(
"Interrupted awaiting response ",
INTERRUPTED_EXCEPTION.getSqlState(),
interruptedException);
} finally {
// bulk can prepare, and so if prepare cache is enable, can replace an already cached
// prepareStatement
// this permit to release those old prepared statement without conflict.
protocol.forceReleaseWaitingPrepareStatement();
}
}
if (protocol.isInterrupted()) {
// interrupted during read, must throw an exception manually
futureReadTask.cancel(true);
throw new SQLTimeoutException("Timeout during batch execution");
}
futureReadTask = null;
} while (status.sendCmdCounter < totalExecutionNumber);
if (exception != null) {
throw exception;
}
return prepareResult;
} catch (IOException e) {
status.sendEnded = true;
status.sendCmdCounter = 0; // to ensure read doesn't hang
throw protocol.handleIoException(e);
}
}