in modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java [584:798]
private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) {
GridQueryCancel cancel = null;
boolean unregisterReq = false;
if (isCancellationSupported()) {
synchronized (reqMux) {
JdbcQueryDescriptor desc = reqRegister.get(req.requestId());
// Query was already cancelled and unregistered.
if (desc == null)
return null;
cancel = desc.cancelHook();
desc.incrementUsageCount();
}
}
try {
int cursorCnt = jdbcCursors.size();
if (maxCursors > 0 && cursorCnt >= maxCursors)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close other " +
"open cursors or increase the limit through " +
"ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors +
", current=" + cursorCnt + ']');
assert !cliCtx.isStream();
String sql = req.sqlQuery();
SqlFieldsQueryEx qry;
switch (req.expectedStatementType()) {
case ANY_STATEMENT_TYPE:
qry = new SqlFieldsQueryEx(sql, null);
break;
case SELECT_STATEMENT_TYPE:
qry = new SqlFieldsQueryEx(sql, true);
break;
default:
assert req.expectedStatementType() == JdbcStatementType.UPDATE_STMT_TYPE;
qry = new SqlFieldsQueryEx(sql, false);
if (cliCtx.isSkipReducerOnUpdate())
((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true);
}
setupQuery(qry, prepareSchemaName(req.schemaName()));
qry.setArgs(req.arguments());
qry.setAutoCommit(req.autoCommit());
if (req.explicitTimeout()) {
// Timeout is handled on a client side, do not handle it on a server side.
qry.setTimeout(0, TimeUnit.MILLISECONDS);
}
if (req.pageSize() <= 0)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid fetch size: " + req.pageSize());
qry.setPageSize(req.pageSize());
String schemaName = req.schemaName();
if (F.isEmpty(schemaName))
schemaName = QueryUtils.DFLT_SCHEMA;
qry.setSchema(schemaName);
int txId = txId(req.txId());
List<FieldsQueryCursor<List<?>>> results = txEnabledForConnection()
? invokeInTransaction(txId, req.autoCommit(), qry, cancel)
: querySqlFields(qry, cancel);
FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
if (fieldsCur instanceof BulkLoadContextCursor) {
BulkLoadContextCursor blCur = (BulkLoadContextCursor)fieldsCur;
BulkLoadProcessor blProc = blCur.bulkLoadProcessor();
BulkLoadAckClientParameters clientParams = blCur.clientParams();
JdbcBulkLoadProcessor proc = new JdbcBulkLoadProcessor(blProc, req.requestId());
jdbcCursors.put(proc.cursorId(), proc);
// responses for the same query on the client side
return resultToResonse(new JdbcBulkLoadAckResult(proc.cursorId(), clientParams));
}
if (results.size() == 1) {
JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(),
(QueryCursorEx<List<?>>)fieldsCur, req.requestId());
jdbcCursors.put(cur.cursorId(), cur);
cur.openIterator();
JdbcQueryExecuteResult res;
PartitionResult partRes = null;
if (fieldsCur instanceof QueryCursorImpl)
partRes = ((QueryCursorImpl<List<?>>)fieldsCur).partitionResult();
if (cur.isQuery()) {
res = new JdbcQueryExecuteResult(
cur.cursorId(),
cur.fetchRows(),
!cur.hasNext(),
isClientPartitionAwarenessApplicable(req.partitionResponseRequest(), partRes) ? partRes : null,
txId
);
}
else {
List<List<Object>> items = cur.fetchRows();
assert items != null && items.size() == 1 && items.get(0).size() == 1
&& items.get(0).get(0) instanceof Long :
"Invalid result set for not-SELECT query. [qry=" + sql +
", res=" + S.toString(List.class, items) + ']';
res = new JdbcQueryExecuteResult(
cur.cursorId(),
(Long)items.get(0).get(0),
isClientPartitionAwarenessApplicable(req.partitionResponseRequest(), partRes) ? partRes : null,
txId
);
}
if (res.last() && (!res.isQuery() || autoCloseCursors)) {
jdbcCursors.remove(cur.cursorId());
unregisterReq = true;
cur.close();
}
return resultToResonse(res);
}
else {
List<JdbcResultInfo> jdbcResults = new ArrayList<>(results.size());
List<List<Object>> items = null;
boolean last = true;
for (FieldsQueryCursor<List<?>> c : results) {
QueryCursorEx<List<?>> qryCur = (QueryCursorEx<List<?>>)c;
JdbcResultInfo jdbcRes;
if (qryCur.isQuery()) {
JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(), qryCur, req.requestId());
jdbcCursors.put(cur.cursorId(), cur);
jdbcRes = new JdbcResultInfo(true, -1, cur.cursorId());
cur.openIterator();
if (items == null) {
items = cur.fetchRows();
last = cur.hasNext();
}
}
else
jdbcRes = new JdbcResultInfo(false, (Long)((List<?>)qryCur.getAll().get(0)).get(0), -1);
jdbcResults.add(jdbcRes);
}
return resultToResonse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last, txId));
}
}
catch (Exception e) {
// Trying to close all cursors of current request.
clearCursors(req.requestId());
unregisterReq = true;
if (X.cause(e, QueryCancelledException.class) != null) {
if (log.isDebugEnabled()) {
log.debug("Failed to execute SQL query " +
"[reqId=" + req.requestId() +
", req=" + req +
"]. Error:" + X.getFullStackTrace(e));
}
return exceptionToResult(new QueryCancelledException());
}
else if (X.cause(e, IgniteSQLException.class) != null) {
IgniteSQLException e0 = X.cause(e, IgniteSQLException.class);
if (isNeedToNodeLog(e0))
U.warn(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e0);
}
else {
U.warn(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
finally {
cleanupQueryCancellationMeta(unregisterReq, req.requestId());
}
}