in src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java [1883:2446]
private void processResultsOnThread(ResultHandler handler,
int flags, int fetchSize,
MessageLoopState msgLoopState,
boolean subQueries,
int[] rowCount,
int maxRows) throws IOException {
boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
boolean useRingBuffer = enableFetchRingBuffer
&& (!handler.wantsScrollableResultSet()) // Scrollable cursor
&& (!subQueries) // Multiple results
&& (!bothRowsAndStatus); // RETURNING clause
List<Tuple> tuples = null;
int c;
boolean endQuery = false;
while (!endQuery) {
c = pgStream.receiveChar();
switch (c) {
case 'A': // Asynchronous Notify
receiveAsyncNotify();
break;
case '1': // Parse Complete (response to Parse)
pgStream.receiveInteger4(); // len, discarded
SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
String parsedStatementName = parsedQuery.getStatementName();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE ParseComplete [{0}]", parsedStatementName);
break;
case 't': { // ParameterDescription
pgStream.receiveInteger4(); // len, discarded
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE ParameterDescription");
DescribeRequest describeData = pendingDescribeStatementQueue.getFirst();
SimpleQuery query = describeData.query;
SimpleParameterList params = describeData.parameterList;
boolean describeOnly = describeData.describeOnly;
// This might differ from query.getStatementName if the query was re-prepared
String origStatementName = describeData.statementName;
int numParams = pgStream.receiveInteger2();
for (int i = 1; i <= numParams; i++) {
int typeOid = pgStream.receiveInteger4();
params.setResolvedType(i, typeOid);
}
// Since we can issue multiple Parse and DescribeStatement
// messages in a single network trip, we need to make
// sure the describe results we requested are still
// applicable to the latest parsed query.
//
if ((origStatementName == null && query.getStatementName() == null)
|| (origStatementName != null
&& origStatementName.equals(query.getStatementName()))) {
query.setPrepareTypes(params.getTypeOIDs());
}
if (describeOnly) {
msgLoopState.doneAfterRowDescNoData = true;
} else {
pendingDescribeStatementQueue.removeFirst();
}
break;
}
case '2': // Bind Complete (response to Bind)
pgStream.receiveInteger4(); // len, discarded
Portal boundPortal = pendingBindQueue.removeFirst();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE BindComplete [{0}]", boundPortal);
registerOpenPortal(boundPortal);
break;
case '3': // Close Complete (response to Close)
pgStream.receiveInteger4(); // len, discarded
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CloseComplete");
break;
case 'n': // No Data (response to Describe)
pgStream.receiveInteger4(); // len, discarded
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE NoData");
pendingDescribePortalQueue.removeFirst();
if (msgLoopState.doneAfterRowDescNoData) {
DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst();
SimpleQuery currentQuery = describeData.query;
Field[] fields = currentQuery.getFields();
if (fields != null) { // There was a resultset.
tuples = new ArrayList<Tuple>();
handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null);
tuples = null;
msgLoopState.queueTuples = null;
}
}
break;
case 's': { // Portal Suspended (end of Execute)
// nb: this appears *instead* of CommandStatus.
// Must be a SELECT if we suspended, so don't worry about it.
pgStream.receiveInteger4(); // len, discarded
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE PortalSuspended");
ExecuteRequest executeData = pendingExecuteQueue.removeFirst();
SimpleQuery currentQuery = executeData.query;
Portal currentPortal = executeData.portal;
Field[] fields = currentQuery.getFields();
if (fields != null
&& (tuples == null
&& msgLoopState.queueTuples == null)) {
// When no results expected, pretend an empty resultset was returned
// Not sure if new ArrayList can be always replaced with emptyList
tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
}
if (msgLoopState.queueTuples != null) {
// Mark end of result
try {
msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(currentPortal);
}
catch (InterruptedException ie) {
// Handle interrupted exception
handler.handleError(
new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
RedshiftState.UNEXPECTED_ERROR, ie));
}
}
else
handler.handleResultRows(currentQuery, fields, tuples, currentPortal, null, rowCount, null);
tuples = null;
msgLoopState.queueTuples = null;
break;
}
case 'C': { // Command Status (end of Execute)
// Handle status.
String status = receiveCommandStatus();
if (isFlushCacheOnDeallocate()
&& (status.startsWith("DEALLOCATE ALL") || status.startsWith("DISCARD ALL"))) {
deallocateEpoch++;
}
msgLoopState.doneAfterRowDescNoData = false;
ExecuteRequest executeData = pendingExecuteQueue.peekFirst();
SimpleQuery currentQuery = executeData.query;
Portal currentPortal = executeData.portal;
String nativeSql = currentQuery.getNativeQuery().nativeSql;
// Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc)
// silently rollback the transaction in the response to COMMIT statement
// in case the transaction has failed.
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org
if (isRaiseExceptionOnSilentRollback()
&& handler.getException() == null
&& status.startsWith("ROLLBACK")) {
String message = null;
if (looksLikeCommit(nativeSql)) {
if (transactionFailCause == null) {
message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)");
} else {
message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage());
}
} else if (looksLikePrepare(nativeSql)) {
if (transactionFailCause == null) {
message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure is not known (check server logs?)");
} else {
message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage());
}
}
if (message != null) {
handler.handleError(
new RedshiftException(
message, RedshiftState.IN_FAILED_SQL_TRANSACTION, transactionFailCause));
}
}
if (status.startsWith("SET")) {
// Scan only the first 1024 characters to
// avoid big overhead for long queries.
if (nativeSql.lastIndexOf("search_path", 1024) != -1
&& !nativeSql.equals(lastSetSearchPathQuery)) {
// Search path was changed, invalidate prepared statement cache
lastSetSearchPathQuery = nativeSql;
deallocateEpoch++;
}
}
if (!executeData.asSimple) {
pendingExecuteQueue.removeFirst();
} else {
// For simple 'Q' queries, executeQueue is cleared via ReadyForQuery message
}
// we want to make sure we do not add any results from these queries to the result set
if (currentQuery == autoSaveQuery
|| currentQuery == releaseAutoSave) {
// ignore "SAVEPOINT" or RELEASE SAVEPOINT status from autosave query
break;
}
Field[] fields = currentQuery.getFields();
if (fields != null
&& (tuples == null
&& msgLoopState.queueTuples == null)) {
// When no results expected, pretend an empty resultset was returned
// Not sure if new ArrayList can be always replaced with emptyList
tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
}
// If we received tuples we must know the structure of the
// resultset, otherwise we won't be able to fetch columns
// from it, etc, later.
if (fields == null
&& (tuples != null
|| msgLoopState.queueTuples != null)) {
throw new IllegalStateException(
"Received resultset tuples, but no field structure for them");
}
if (fields != null
|| (tuples != null
|| msgLoopState.queueTuples != null)) {
// There was a resultset.
if (msgLoopState.queueTuples == null)
handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null);
else {
// Mark end of result
try {
msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator();
} catch (InterruptedException ie) {
// Handle interrupted exception
handler.handleError(
new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
RedshiftState.UNEXPECTED_ERROR, ie));
}
}
tuples = null;
msgLoopState.queueTuples = null;
rowCount = new int[1]; // Allocate for the next resultset
if (bothRowsAndStatus) {
interpretCommandStatus(status, handler);
}
} else {
interpretCommandStatus(status, handler);
}
if (executeData.asSimple) {
// Simple queries might return several resultsets, thus we clear
// fields, so queries like "select 1;update; select2" will properly
// identify that "update" did not return any results
currentQuery.setFields(null);
}
if (currentPortal != null) {
currentPortal.close();
}
break;
}
case 'D': // Data Transfer (ongoing Execute response)
boolean skipRow = false;
Tuple tuple = null;
try {
tuple = pgStream.receiveTupleV3();
} catch (OutOfMemoryError oome) {
if (!noResults) {
handler.handleError(
new RedshiftException(GT.tr("Ran out of memory retrieving query results."),
RedshiftState.OUT_OF_MEMORY, oome));
}
} catch (SQLException e) {
handler.handleError(e);
}
if (!noResults) {
if(rowCount != null) {
if(maxRows > 0 && rowCount[0] >= maxRows) {
// Ignore any more rows until server fix not to send more rows than max rows.
skipRow = true;
}
else
rowCount[0] += 1;
}
if (useRingBuffer) {
boolean firstRow = false;
if (msgLoopState.queueTuples == null) {
// i.e. First row
firstRow = true;
msgLoopState.queueTuples = new RedshiftRowsBlockingQueue<Tuple>(fetchSize, fetchRingBufferSize, logger);
}
// Add row in the queue
if(!skipRow) {
try {
msgLoopState.queueTuples.put(tuple);
} catch (InterruptedException ie) {
// Handle interrupted exception
handler.handleError(
new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
RedshiftState.UNEXPECTED_ERROR, ie));
}
}
if(firstRow) {
// There was a resultset.
ExecuteRequest executeData = pendingExecuteQueue.peekFirst();
SimpleQuery currentQuery = executeData.query;
Field[] fields = currentQuery.getFields();
// Create a new ring buffer thread to process rows
m_ringBufferThread = new RingBufferThread(handler, flags, fetchSize, msgLoopState, subQueries, rowCount, maxRows);
handler.handleResultRows(currentQuery, fields, null, null, msgLoopState.queueTuples, rowCount, m_ringBufferThread);
if (RedshiftLogger.isEnable()) {
int length;
if (tuple == null) {
length = -1;
} else {
length = tuple.length();
}
logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length);
}
// Start the ring buffer thread
m_ringBufferThread.start();
// Return to break the message loop on the application thread
return;
}
else
if(m_ringBufferStopThread)
return; // Break the ring buffer thread loop
}
else {
if (tuples == null) {
tuples = new ArrayList<Tuple>();
}
if(!skipRow)
tuples.add(tuple);
}
}
if (RedshiftLogger.isEnable()) {
int length;
if (tuple == null) {
length = -1;
} else {
length = tuple.length();
}
logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length);
if (skipRow) {
logger.log(LogLevel.DEBUG, " skipRow={0}, rowCount = {1}, maxRows = {2}"
, skipRow, (rowCount!= null) ? rowCount[0] : 0, maxRows);
}
}
break;
case 'E':
// Error Response (response to pretty much everything; backend then skips until Sync)
SQLException error = receiveErrorResponse(false);
handler.handleError(error);
if (willHealViaReparse(error)) {
// prepared statement ... is not valid kind of error
// Technically speaking, the error is unexpected, thus we invalidate other
// server-prepared statements just in case.
deallocateEpoch++;
if (RedshiftLogger.isEnable()) {
logger.log(LogLevel.DEBUG, " FE: received {0}, will invalidate statements. deallocateEpoch is now {1}",
new Object[]{error.getSQLState(), deallocateEpoch});
}
}
// keep processing
break;
case 'I': { // Empty Query (end of Execute)
pgStream.receiveInteger4();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE EmptyQuery");
ExecuteRequest executeData = pendingExecuteQueue.removeFirst();
Portal currentPortal = executeData.portal;
handler.handleCommandStatus("EMPTY", 0, 0);
if (currentPortal != null) {
currentPortal.close();
}
break;
}
case 'N': // Notice Response
SQLWarning warning = receiveNoticeResponse();
handler.handleWarning(warning);
break;
case 'S': // Parameter Status
try {
receiveParameterStatus();
} catch (SQLException e) {
handler.handleError(e);
endQuery = true;
}
break;
case 'T': // Row Description (response to Describe)
Field[] fields = receiveFields(serverProtocolVersion);
tuples = new ArrayList<Tuple>();
SimpleQuery query = pendingDescribePortalQueue.peekFirst();
if (!pendingExecuteQueue.isEmpty() && !pendingExecuteQueue.peekFirst().asSimple) {
pendingDescribePortalQueue.removeFirst();
}
query.setFields(fields);
if (msgLoopState.doneAfterRowDescNoData) {
DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst();
SimpleQuery currentQuery = describeData.query;
currentQuery.setFields(fields);
if (msgLoopState.queueTuples != null) {
// TODO: is this possible?
}
handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null);
tuples = null;
msgLoopState.queueTuples = null;
}
break;
case 'Z': // Ready For Query (eventual response to Sync)
receiveRFQ();
if (!pendingExecuteQueue.isEmpty() && pendingExecuteQueue.peekFirst().asSimple) {
if (msgLoopState.queueTuples != null) {
try {
msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator();
} catch (InterruptedException ie) {
// Handle interrupted exception
handler.handleError(
new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
RedshiftState.UNEXPECTED_ERROR, ie));
}
}
tuples = null;
msgLoopState.queueTuples = null;
pgStream.clearResultBufferCount();
ExecuteRequest executeRequest = pendingExecuteQueue.removeFirst();
// Simple queries might return several resultsets, thus we clear
// fields, so queries like "select 1;update; select2" will properly
// identify that "update" did not return any results
executeRequest.query.setFields(null);
pendingDescribePortalQueue.removeFirst();
if (!pendingExecuteQueue.isEmpty()) {
if (getTransactionState() == TransactionState.IDLE) {
handler.secureProgress();
}
// process subsequent results (e.g. for cases like batched execution of simple 'Q' queries)
break;
}
}
endQuery = true;
// Reset the statement name of Parses that failed.
while (!pendingParseQueue.isEmpty()) {
SimpleQuery failedQuery = pendingParseQueue.removeFirst();
failedQuery.unprepare();
}
pendingParseQueue.clear(); // No more ParseComplete messages expected.
// Pending "describe" requests might be there in case of error
// If that is the case, reset "described" status, so the statement is properly
// described on next execution
while (!pendingDescribeStatementQueue.isEmpty()) {
DescribeRequest request = pendingDescribeStatementQueue.removeFirst();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " FE marking setStatementDescribed(false) for query {0}", request.query);
request.query.setStatementDescribed(false);
}
while (!pendingDescribePortalQueue.isEmpty()) {
SimpleQuery describePortalQuery = pendingDescribePortalQueue.removeFirst();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " FE marking setPortalDescribed(false) for query {0}", describePortalQuery);
describePortalQuery.setPortalDescribed(false);
}
pendingBindQueue.clear(); // No more BindComplete messages expected.
pendingExecuteQueue.clear(); // No more query executions expected.
break;
case 'G': // CopyInResponse
if(RedshiftLogger.isEnable()) {
logger.log(LogLevel.DEBUG, " <=BE CopyInResponse");
logger.log(LogLevel.DEBUG, " FE=> CopyFail");
}
// COPY sub-protocol is not implemented yet
// We'll send a CopyFail message for COPY FROM STDIN so that
// server does not wait for the data.
byte[] buf = Utils.encodeUTF8(COPY_ERROR_MESSAGE);
pgStream.sendChar('f');
pgStream.sendInteger4(buf.length + 4 + 1);
pgStream.send(buf);
pgStream.sendChar(0);
pgStream.flush();
sendSync(true); // send sync message
skipMessage(); // skip the response message
break;
case 'H': // CopyOutResponse
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyOutResponse");
skipMessage();
// In case of CopyOutResponse, we cannot abort data transfer,
// so just throw an error and ignore CopyData messages
handler.handleError(
new RedshiftException(GT.tr(COPY_ERROR_MESSAGE),
RedshiftState.NOT_IMPLEMENTED));
break;
case 'c': // CopyDone
skipMessage();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyDone");
break;
case 'd': // CopyData
skipMessage();
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyData");
break;
default:
throw new IOException("Unexpected packet type: " + c);
}
}
}