private boolean executeBulkBatch()

in src/main/java/org/mariadb/jdbc/internal/protocol/AbstractQueryProtocol.java [449:623]


  private boolean executeBulkBatch(
      Results results,
      String sql,
      ServerPrepareResult serverPrepareResult,
      final List<ParameterHolder[]> parametersList)
      throws SQLException {

    // **************************************************************************************
    // Ensure BULK can be use :
    // - server version >= 10.2.7
    // - no stream
    // - parameter type doesn't change
    // - avoid INSERT FROM SELECT
    // **************************************************************************************

    // ensure that there is no long data and type doesn't change
    ParameterHolder[] initParameters = parametersList.get(0);
    int parameterCount = initParameters.length;
    short[] types = new short[parameterCount];
    for (int i = 0; i < parameterCount; i++) {
      types[i] = initParameters[i].getColumnType().getType();
    }

    // must ensure that data type doesn't change
    for (ParameterHolder[] parameters : parametersList) {
      for (int i = 0; i < parameterCount; i++) {
        if (parameters[i].getColumnType().getType() != types[i]) {
          return false;
        }
      }
    }

    // any select query is not applicable to bulk
    if (sql.toLowerCase(Locale.ROOT).contains("select")) {
      return false;
    }

    cmdPrologue();
    ParameterHolder[] parameters = null;
    ServerPrepareResult tmpServerPrepareResult = serverPrepareResult;
    try {
      SQLException exception = null;

      // **************************************************************************************
      // send PREPARE if needed
      // **************************************************************************************
      if (serverPrepareResult == null) {
        tmpServerPrepareResult = prepare(sql, true);
      }

      // **************************************************************************************
      // send BULK
      // **************************************************************************************
      int statementId =
          tmpServerPrepareResult != null ? tmpServerPrepareResult.getStatementId() : -1;

      byte[] lastCmdData = null;
      int index = 0;
      ParameterHolder[] firstParameters = parametersList.get(0);

      do {
        writer.startPacket(0);
        writer.write(COM_STMT_BULK_EXECUTE);
        writer.writeInt(statementId);
        writer.writeShort((short) 128); // always SEND_TYPES_TO_SERVER

        for (ParameterHolder param : firstParameters) {
          writer.writeShort(param.getColumnType().getType());
        }

        if (lastCmdData != null) {
          writer.checkMaxAllowedLength(lastCmdData.length);
          writer.write(lastCmdData);
          writer.mark();
          index++;
          lastCmdData = null;
        }

        for (; index < parametersList.size(); index++) {
          parameters = parametersList.get(index);
          for (int i = 0; i < parameterCount; i++) {
            ParameterHolder holder = parameters[i];
            if (holder.isNullData()) {
              writer.write(1); // NULL
            } else {
              writer.write(0); // NONE
              holder.writeBinary(writer);
            }
          }

          // if buffer > MAX_ALLOWED_PACKET, flush until last mark.
          if (writer.exceedMaxLength() && writer.isMarked()) {
            writer.flushBufferStopAtMark();
          }

          // if flushed, quit loop
          if (writer.bufferIsDataAfterMark()) {
            break;
          }

          writer.checkMaxAllowedLength(0);
          writer.mark();
        }

        if (writer.bufferIsDataAfterMark()) {
          // flush has been done
          lastCmdData = writer.resetMark();
        } else {
          writer.flush();
          writer.resetMark();
        }

        try {
          getResult(results);
        } catch (SQLException sqle) {
          if ("HY000".equals(sqle.getSQLState()) && sqle.getErrorCode() == 1295) {
            // query contain commands that cannot be handled by BULK protocol
            // clear error and special error code, so it won't leak anywhere
            // and wouldn't be misinterpreted as an additional update count
            results.getCmdInformation().reset();
            return false;
          }
          if (exception == null) {
            exception = exceptionWithQuery(sql, sqle, explicitClosed);
            if (!options.continueBatchOnError) {
              throw exception;
            }
          }
        }

      } while (index < parametersList.size() - 1);

      if (lastCmdData != null) {
        writer.startPacket(0);
        writer.write(COM_STMT_BULK_EXECUTE);
        writer.writeInt(statementId);
        writer.writeShort((byte) 0x80); // always SEND_TYPES_TO_SERVER

        for (ParameterHolder param : firstParameters) {
          writer.writeShort(param.getColumnType().getType());
        }
        writer.write(lastCmdData);
        writer.flush();
        try {
          getResult(results);
        } catch (SQLException sqle) {
          if ("HY000".equals(sqle.getSQLState()) && sqle.getErrorCode() == 1295) {
            // query contain SELECT. cannot be handle by BULK protocol
            return false;
          }
          if (exception == null) {
            exception = exceptionWithQuery(sql, sqle, explicitClosed);
            if (!options.continueBatchOnError) {
              throw exception;
            }
          }
        }
      }

      if (exception != null) {
        throw exception;
      }
      results.setRewritten(true);
      return true;

    } catch (IOException e) {
      throw exceptionWithQuery(
          parameters, tmpServerPrepareResult, handleIoException(e), explicitClosed);
    } finally {
      if (serverPrepareResult == null && tmpServerPrepareResult != null) {
        releasePrepareStatement(tmpServerPrepareResult);
      }
      writer.resetMark();
    }
  }