in src/main/java/org/mariadb/jdbc/internal/protocol/AbstractQueryProtocol.java [449:623]
private boolean executeBulkBatch(
Results results,
String sql,
ServerPrepareResult serverPrepareResult,
final List<ParameterHolder[]> parametersList)
throws SQLException {
// **************************************************************************************
// Ensure BULK can be use :
// - server version >= 10.2.7
// - no stream
// - parameter type doesn't change
// - avoid INSERT FROM SELECT
// **************************************************************************************
// ensure that there is no long data and type doesn't change
ParameterHolder[] initParameters = parametersList.get(0);
int parameterCount = initParameters.length;
short[] types = new short[parameterCount];
for (int i = 0; i < parameterCount; i++) {
types[i] = initParameters[i].getColumnType().getType();
}
// must ensure that data type doesn't change
for (ParameterHolder[] parameters : parametersList) {
for (int i = 0; i < parameterCount; i++) {
if (parameters[i].getColumnType().getType() != types[i]) {
return false;
}
}
}
// any select query is not applicable to bulk
if (sql.toLowerCase(Locale.ROOT).contains("select")) {
return false;
}
cmdPrologue();
ParameterHolder[] parameters = null;
ServerPrepareResult tmpServerPrepareResult = serverPrepareResult;
try {
SQLException exception = null;
// **************************************************************************************
// send PREPARE if needed
// **************************************************************************************
if (serverPrepareResult == null) {
tmpServerPrepareResult = prepare(sql, true);
}
// **************************************************************************************
// send BULK
// **************************************************************************************
int statementId =
tmpServerPrepareResult != null ? tmpServerPrepareResult.getStatementId() : -1;
byte[] lastCmdData = null;
int index = 0;
ParameterHolder[] firstParameters = parametersList.get(0);
do {
writer.startPacket(0);
writer.write(COM_STMT_BULK_EXECUTE);
writer.writeInt(statementId);
writer.writeShort((short) 128); // always SEND_TYPES_TO_SERVER
for (ParameterHolder param : firstParameters) {
writer.writeShort(param.getColumnType().getType());
}
if (lastCmdData != null) {
writer.checkMaxAllowedLength(lastCmdData.length);
writer.write(lastCmdData);
writer.mark();
index++;
lastCmdData = null;
}
for (; index < parametersList.size(); index++) {
parameters = parametersList.get(index);
for (int i = 0; i < parameterCount; i++) {
ParameterHolder holder = parameters[i];
if (holder.isNullData()) {
writer.write(1); // NULL
} else {
writer.write(0); // NONE
holder.writeBinary(writer);
}
}
// if buffer > MAX_ALLOWED_PACKET, flush until last mark.
if (writer.exceedMaxLength() && writer.isMarked()) {
writer.flushBufferStopAtMark();
}
// if flushed, quit loop
if (writer.bufferIsDataAfterMark()) {
break;
}
writer.checkMaxAllowedLength(0);
writer.mark();
}
if (writer.bufferIsDataAfterMark()) {
// flush has been done
lastCmdData = writer.resetMark();
} else {
writer.flush();
writer.resetMark();
}
try {
getResult(results);
} catch (SQLException sqle) {
if ("HY000".equals(sqle.getSQLState()) && sqle.getErrorCode() == 1295) {
// query contain commands that cannot be handled by BULK protocol
// clear error and special error code, so it won't leak anywhere
// and wouldn't be misinterpreted as an additional update count
results.getCmdInformation().reset();
return false;
}
if (exception == null) {
exception = exceptionWithQuery(sql, sqle, explicitClosed);
if (!options.continueBatchOnError) {
throw exception;
}
}
}
} while (index < parametersList.size() - 1);
if (lastCmdData != null) {
writer.startPacket(0);
writer.write(COM_STMT_BULK_EXECUTE);
writer.writeInt(statementId);
writer.writeShort((byte) 0x80); // always SEND_TYPES_TO_SERVER
for (ParameterHolder param : firstParameters) {
writer.writeShort(param.getColumnType().getType());
}
writer.write(lastCmdData);
writer.flush();
try {
getResult(results);
} catch (SQLException sqle) {
if ("HY000".equals(sqle.getSQLState()) && sqle.getErrorCode() == 1295) {
// query contain SELECT. cannot be handle by BULK protocol
return false;
}
if (exception == null) {
exception = exceptionWithQuery(sql, sqle, explicitClosed);
if (!options.continueBatchOnError) {
throw exception;
}
}
}
}
if (exception != null) {
throw exception;
}
results.setRewritten(true);
return true;
} catch (IOException e) {
throw exceptionWithQuery(
parameters, tmpServerPrepareResult, handleIoException(e), explicitClosed);
} finally {
if (serverPrepareResult == null && tmpServerPrepareResult != null) {
releasePrepareStatement(tmpServerPrepareResult);
}
writer.resetMark();
}
}