private void processResultsOnThread()

in src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java [1883:2446]


  private void processResultsOnThread(ResultHandler handler, 
  				int flags, int fetchSize, 
  				MessageLoopState msgLoopState,
  				boolean subQueries,
  				int[] rowCount,
  				int maxRows) throws IOException {
    boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
    boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
    boolean useRingBuffer = enableFetchRingBuffer 
    												&& (!handler.wantsScrollableResultSet()) // Scrollable cursor
    												&& (!subQueries) // Multiple results
    												&& (!bothRowsAndStatus); // RETURNING clause 

    List<Tuple> tuples = null;

    int c;
    boolean endQuery = false;

    while (!endQuery) {
      c = pgStream.receiveChar();
      switch (c) {
        case 'A': // Asynchronous Notify
          receiveAsyncNotify();
          break;

        case '1': // Parse Complete (response to Parse)
          pgStream.receiveInteger4(); // len, discarded

          SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
          String parsedStatementName = parsedQuery.getStatementName();

        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE ParseComplete [{0}]", parsedStatementName);

          break;

        case 't': { // ParameterDescription
          pgStream.receiveInteger4(); // len, discarded

        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE ParameterDescription");

          DescribeRequest describeData = pendingDescribeStatementQueue.getFirst();
          SimpleQuery query = describeData.query;
          SimpleParameterList params = describeData.parameterList;
          boolean describeOnly = describeData.describeOnly;
          // This might differ from query.getStatementName if the query was re-prepared
          String origStatementName = describeData.statementName;

          int numParams = pgStream.receiveInteger2();

          for (int i = 1; i <= numParams; i++) {
            int typeOid = pgStream.receiveInteger4();
            params.setResolvedType(i, typeOid);
          }

          // Since we can issue multiple Parse and DescribeStatement
          // messages in a single network trip, we need to make
          // sure the describe results we requested are still
          // applicable to the latest parsed query.
          //
          if ((origStatementName == null && query.getStatementName() == null)
              || (origStatementName != null
                  && origStatementName.equals(query.getStatementName()))) {
            query.setPrepareTypes(params.getTypeOIDs());
          }

          if (describeOnly) {
          	msgLoopState.doneAfterRowDescNoData = true;
          } else {
            pendingDescribeStatementQueue.removeFirst();
          }
          break;
        }

        case '2': // Bind Complete (response to Bind)
          pgStream.receiveInteger4(); // len, discarded

          Portal boundPortal = pendingBindQueue.removeFirst();
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE BindComplete [{0}]", boundPortal);

          registerOpenPortal(boundPortal);
          break;

        case '3': // Close Complete (response to Close)
          pgStream.receiveInteger4(); // len, discarded
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE CloseComplete");
          break;

        case 'n': // No Data (response to Describe)
          pgStream.receiveInteger4(); // len, discarded
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE NoData");

          pendingDescribePortalQueue.removeFirst();

          if (msgLoopState.doneAfterRowDescNoData) {
            DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst();
            SimpleQuery currentQuery = describeData.query;

            Field[] fields = currentQuery.getFields();

            if (fields != null) { // There was a resultset.
            	tuples = new ArrayList<Tuple>();
              handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null);
              tuples = null;
              msgLoopState.queueTuples = null;
            }
          }
          break;

        case 's': { // Portal Suspended (end of Execute)
          // nb: this appears *instead* of CommandStatus.
          // Must be a SELECT if we suspended, so don't worry about it.

          pgStream.receiveInteger4(); // len, discarded
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE PortalSuspended");

          ExecuteRequest executeData = pendingExecuteQueue.removeFirst();
          SimpleQuery currentQuery = executeData.query;
          Portal currentPortal = executeData.portal;

          Field[] fields = currentQuery.getFields();
          if (fields != null 
          			&& (tuples == null 
          						&& msgLoopState.queueTuples == null)) {
            // When no results expected, pretend an empty resultset was returned
            // Not sure if new ArrayList can be always replaced with emptyList
          	tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
          }

          if (msgLoopState.queueTuples != null) {
        		// Mark end of result
        		try {
        			msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(currentPortal);
						} 
        		catch (InterruptedException ie) {
							// Handle interrupted exception
              handler.handleError(
                  new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
                      RedshiftState.UNEXPECTED_ERROR, ie));
						}
          }
          else
        		handler.handleResultRows(currentQuery, fields, tuples, currentPortal, null, rowCount, null);
          
          tuples = null;
          msgLoopState.queueTuples = null;
          
          break;
        }

        case 'C': { // Command Status (end of Execute)
          // Handle status.
          String status = receiveCommandStatus();
          if (isFlushCacheOnDeallocate()
              && (status.startsWith("DEALLOCATE ALL") || status.startsWith("DISCARD ALL"))) {
            deallocateEpoch++;
          }

          msgLoopState.doneAfterRowDescNoData = false;

          ExecuteRequest executeData = pendingExecuteQueue.peekFirst();
          SimpleQuery currentQuery = executeData.query;
          Portal currentPortal = executeData.portal;

          String nativeSql = currentQuery.getNativeQuery().nativeSql;
          // Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc)
          // silently rollback the transaction in the response to COMMIT statement
          // in case the transaction has failed.
          // See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org
          if (isRaiseExceptionOnSilentRollback()
              && handler.getException() == null
              && status.startsWith("ROLLBACK")) {
            String message = null;
            if (looksLikeCommit(nativeSql)) {
              if (transactionFailCause == null) {
                message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)");
              } else {
                message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage());
              }
            } else if (looksLikePrepare(nativeSql)) {
              if (transactionFailCause == null) {
                message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure is not known (check server logs?)");
              } else {
                message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage());
              }
            }
            if (message != null) {
              handler.handleError(
                  new RedshiftException(
                      message, RedshiftState.IN_FAILED_SQL_TRANSACTION, transactionFailCause));
            }
          }

          if (status.startsWith("SET")) {
            // Scan only the first 1024 characters to
            // avoid big overhead for long queries.
            if (nativeSql.lastIndexOf("search_path", 1024) != -1
                && !nativeSql.equals(lastSetSearchPathQuery)) {
              // Search path was changed, invalidate prepared statement cache
              lastSetSearchPathQuery = nativeSql;
              deallocateEpoch++;
            }
          }

          if (!executeData.asSimple) {
            pendingExecuteQueue.removeFirst();
          } else {
            // For simple 'Q' queries, executeQueue is cleared via ReadyForQuery message
          }

          // we want to make sure we do not add any results from these queries to the result set
          if (currentQuery == autoSaveQuery
              || currentQuery == releaseAutoSave) {
            // ignore "SAVEPOINT" or RELEASE SAVEPOINT status from autosave query
            break;
          }

          Field[] fields = currentQuery.getFields();
          if (fields != null 
          			&& (tuples == null 
          						&& msgLoopState.queueTuples == null)) {
            // When no results expected, pretend an empty resultset was returned
            // Not sure if new ArrayList can be always replaced with emptyList
          	tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
          }

          // If we received tuples we must know the structure of the
          // resultset, otherwise we won't be able to fetch columns
          // from it, etc, later.
          if (fields == null 
          			&& (tuples != null 
          						|| msgLoopState.queueTuples != null)) {
            throw new IllegalStateException(
                "Received resultset tuples, but no field structure for them");
          }

          if (fields != null 
          			|| (tuples != null 
          						|| msgLoopState.queueTuples != null)) {
            // There was a resultset.
          	if (msgLoopState.queueTuples == null)
          		handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null);
          	else {
          		// Mark end of result
          		try {
          			msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator();
							} catch (InterruptedException ie) {
								// Handle interrupted exception
	              handler.handleError(
	                  new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
	                      RedshiftState.UNEXPECTED_ERROR, ie));
							}
          	}
          	
            tuples = null;
            msgLoopState.queueTuples = null;
            rowCount = new int[1]; // Allocate for the next resultset

            if (bothRowsAndStatus) {
              interpretCommandStatus(status, handler);
            }
          } else {
            interpretCommandStatus(status, handler);
          }

          if (executeData.asSimple) {
            // Simple queries might return several resultsets, thus we clear
            // fields, so queries like "select 1;update; select2" will properly
            // identify that "update" did not return any results
            currentQuery.setFields(null);
          }

          if (currentPortal != null) {
            currentPortal.close();
          }
          break;
        }

        case 'D': // Data Transfer (ongoing Execute response)
        	boolean skipRow = false;
          Tuple tuple = null;
          try {
            tuple = pgStream.receiveTupleV3();
          } catch (OutOfMemoryError oome) {
            if (!noResults) {
              handler.handleError(
                  new RedshiftException(GT.tr("Ran out of memory retrieving query results."),
                      RedshiftState.OUT_OF_MEMORY, oome));
            }
          } catch (SQLException e) {
            handler.handleError(e);
          }
          if (!noResults) {
          	if(rowCount != null) {
        			if(maxRows > 0 && rowCount[0] >= maxRows) {
        				// Ignore any more rows until server fix not to send more rows than max rows.
        				skipRow = true;
        			}
        			else
        				rowCount[0] += 1;
          	}
          	
          	if (useRingBuffer) {
          		boolean firstRow = false;
          		if (msgLoopState.queueTuples == null) {
          			// i.e. First row
          			firstRow = true;
          			msgLoopState.queueTuples = new RedshiftRowsBlockingQueue<Tuple>(fetchSize, fetchRingBufferSize, logger);
          		}
          		
          		// Add row in the queue
          		if(!skipRow) {
	        			try {
	        					msgLoopState.queueTuples.put(tuple);
								} catch (InterruptedException ie) {
										// Handle interrupted exception
			              handler.handleError(
			                  new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
			                      RedshiftState.UNEXPECTED_ERROR, ie));
								}
          		}
        			
        			if(firstRow) {
                // There was a resultset.
                ExecuteRequest executeData = pendingExecuteQueue.peekFirst();
                SimpleQuery currentQuery = executeData.query;
                Field[] fields = currentQuery.getFields();
                
        	  		// Create a new ring buffer thread to process rows
        	  		m_ringBufferThread = new RingBufferThread(handler, flags, fetchSize, msgLoopState, subQueries, rowCount, maxRows);
                
                handler.handleResultRows(currentQuery, fields, null, null, msgLoopState.queueTuples, rowCount, m_ringBufferThread);
                
                if (RedshiftLogger.isEnable()) {
                  int length;
                  if (tuple == null) {
                    length = -1;
                  } else {
                    length = tuple.length();
                  }
                  logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length);
                }
        	  		
        	  		// Start the ring buffer thread
        	  		m_ringBufferThread.start();
        	  		
              	// Return to break the message loop on the application thread
              	return;
        			}
        			else
      				if(m_ringBufferStopThread)
      					return; // Break the ring buffer thread loop
          	}
          	else {
	            if (tuples == null) {
	              tuples = new ArrayList<Tuple>();
	            }
	            
	            if(!skipRow)	            
	            	tuples.add(tuple);
          	}
          }

          if (RedshiftLogger.isEnable()) {
            int length;
            if (tuple == null) {
              length = -1;
            } else {
              length = tuple.length();
            }
          	logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length);
          	if (skipRow) {
          		logger.log(LogLevel.DEBUG, " skipRow={0}, rowCount = {1},  maxRows = {2}" 
          					, skipRow, (rowCount!= null) ? rowCount[0] : 0, maxRows);
          	}
          }

          break;

        case 'E':
          // Error Response (response to pretty much everything; backend then skips until Sync)
          SQLException error = receiveErrorResponse(false);
          handler.handleError(error);
          if (willHealViaReparse(error)) {
            // prepared statement ... is not valid kind of error
            // Technically speaking, the error is unexpected, thus we invalidate other
            // server-prepared statements just in case.
            deallocateEpoch++;
            if (RedshiftLogger.isEnable()) {
              logger.log(LogLevel.DEBUG, " FE: received {0}, will invalidate statements. deallocateEpoch is now {1}",
                  new Object[]{error.getSQLState(), deallocateEpoch});
            }
          }
          // keep processing
          break;

        case 'I': { // Empty Query (end of Execute)
          pgStream.receiveInteger4();

        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE EmptyQuery");

          ExecuteRequest executeData = pendingExecuteQueue.removeFirst();
          Portal currentPortal = executeData.portal;
          handler.handleCommandStatus("EMPTY", 0, 0);
          if (currentPortal != null) {
            currentPortal.close();
          }
          break;
        }

        case 'N': // Notice Response
          SQLWarning warning = receiveNoticeResponse();
          handler.handleWarning(warning);
          break;

        case 'S': // Parameter Status
          try {
            receiveParameterStatus();
          } catch (SQLException e) {
            handler.handleError(e);
            endQuery = true;
          }
          break;

        case 'T': // Row Description (response to Describe)
          Field[] fields = receiveFields(serverProtocolVersion);
          tuples = new ArrayList<Tuple>();

          SimpleQuery query = pendingDescribePortalQueue.peekFirst();
          if (!pendingExecuteQueue.isEmpty() && !pendingExecuteQueue.peekFirst().asSimple) {
            pendingDescribePortalQueue.removeFirst();
          }
          query.setFields(fields);

          if (msgLoopState.doneAfterRowDescNoData) {
            DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst();
            SimpleQuery currentQuery = describeData.query;
            currentQuery.setFields(fields);

            if (msgLoopState.queueTuples != null) {
            	// TODO: is this possible?
            }
            
            handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null);
            tuples = null;
            msgLoopState.queueTuples = null;
          }
          break;

        case 'Z': // Ready For Query (eventual response to Sync)
          receiveRFQ();
          if (!pendingExecuteQueue.isEmpty() && pendingExecuteQueue.peekFirst().asSimple) {
          	if (msgLoopState.queueTuples != null) {
          		try {
								msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator();
							} catch (InterruptedException ie) {
								// Handle interrupted exception
	              handler.handleError(
	                  new RedshiftException(GT.tr("Interrupted exception retrieving query results."),
	                      RedshiftState.UNEXPECTED_ERROR, ie));
							}
          	}
            tuples = null;
            msgLoopState.queueTuples = null;
            pgStream.clearResultBufferCount();

            ExecuteRequest executeRequest = pendingExecuteQueue.removeFirst();
            // Simple queries might return several resultsets, thus we clear
            // fields, so queries like "select 1;update; select2" will properly
            // identify that "update" did not return any results
            executeRequest.query.setFields(null);

            pendingDescribePortalQueue.removeFirst();
            if (!pendingExecuteQueue.isEmpty()) {
              if (getTransactionState() == TransactionState.IDLE) {
                handler.secureProgress();
              }
              // process subsequent results (e.g. for cases like batched execution of simple 'Q' queries)
              break;
            }
          }
          endQuery = true;

          // Reset the statement name of Parses that failed.
          while (!pendingParseQueue.isEmpty()) {
            SimpleQuery failedQuery = pendingParseQueue.removeFirst();
            failedQuery.unprepare();
          }

          pendingParseQueue.clear(); // No more ParseComplete messages expected.
          // Pending "describe" requests might be there in case of error
          // If that is the case, reset "described" status, so the statement is properly
          // described on next execution
          while (!pendingDescribeStatementQueue.isEmpty()) {
            DescribeRequest request = pendingDescribeStatementQueue.removeFirst();
          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " FE marking setStatementDescribed(false) for query {0}", request.query);
            request.query.setStatementDescribed(false);
          }
          while (!pendingDescribePortalQueue.isEmpty()) {
            SimpleQuery describePortalQuery = pendingDescribePortalQueue.removeFirst();
          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " FE marking setPortalDescribed(false) for query {0}", describePortalQuery);
            describePortalQuery.setPortalDescribed(false);
          }
          pendingBindQueue.clear(); // No more BindComplete messages expected.
          pendingExecuteQueue.clear(); // No more query executions expected.
          break;

        case 'G': // CopyInResponse
        	if(RedshiftLogger.isEnable()) {   	
	          logger.log(LogLevel.DEBUG, " <=BE CopyInResponse");
	          logger.log(LogLevel.DEBUG, " FE=> CopyFail");
        	}

          // COPY sub-protocol is not implemented yet
          // We'll send a CopyFail message for COPY FROM STDIN so that
          // server does not wait for the data.

          byte[] buf = Utils.encodeUTF8(COPY_ERROR_MESSAGE);
          pgStream.sendChar('f');
          pgStream.sendInteger4(buf.length + 4 + 1);
          pgStream.send(buf);
          pgStream.sendChar(0);
          pgStream.flush();
          sendSync(true); // send sync message
          skipMessage(); // skip the response message
          break;

        case 'H': // CopyOutResponse
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE CopyOutResponse");

          skipMessage();
          // In case of CopyOutResponse, we cannot abort data transfer,
          // so just throw an error and ignore CopyData messages
          handler.handleError(
              new RedshiftException(GT.tr(COPY_ERROR_MESSAGE),
                  RedshiftState.NOT_IMPLEMENTED));
          break;

        case 'c': // CopyDone
          skipMessage();
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE CopyDone");
          break;

        case 'd': // CopyData
          skipMessage();
        	if(RedshiftLogger.isEnable())    	
        		logger.log(LogLevel.DEBUG, " <=BE CopyData");
          break;

        default:
          throw new IOException("Unexpected packet type: " + c);
      }
    }
  }