protected void processResults()

in pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java [2070:2490]


  protected void processResults(ResultHandler handler, int flags, boolean adaptiveFetch)
      throws IOException {
    boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
    boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;

    List<Tuple> tuples = null;

    int c;
    boolean endQuery = false;

    // At the end of a command execution we have the CommandComplete
    // message to tell us we're done, but with a describeOnly command
    // we have no real flag to let us know we're done. We've got to
    // look for the next RowDescription or NoData message and return
    // from there.
    boolean doneAfterRowDescNoData = false;

    while (!endQuery) {
      c = pgStream.receiveChar();
      switch (c) {
        case 'A': // Asynchronous Notify
          receiveAsyncNotify();
          break;

        case '1': // Parse Complete (response to Parse)
          pgStream.receiveInteger4(); // len, discarded

          SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
          String parsedStatementName = parsedQuery.getStatementName();

          LOGGER.log(Level.FINEST, " <=BE ParseComplete [{0}]", parsedStatementName);

          break;

        case 't': { // ParameterDescription
          pgStream.receiveInteger4(); // len, discarded

          LOGGER.log(Level.FINEST, " <=BE ParameterDescription");

          DescribeRequest describeData = pendingDescribeStatementQueue.getFirst();
          SimpleQuery query = describeData.query;
          SimpleParameterList params = describeData.parameterList;
          boolean describeOnly = describeData.describeOnly;
          // This might differ from query.getStatementName if the query was re-prepared
          String origStatementName = describeData.statementName;

          int numParams = pgStream.receiveInteger2();

          for (int i = 1; i <= numParams; i++) {
            int typeOid = pgStream.receiveInteger4();
            params.setResolvedType(i, typeOid);
          }

          // Since we can issue multiple Parse and DescribeStatement
          // messages in a single network trip, we need to make
          // sure the describe results we requested are still
          // applicable to the latest parsed query.
          //
          if ((origStatementName == null && query.getStatementName() == null)
              || (origStatementName != null
                  && origStatementName.equals(query.getStatementName()))) {
            query.setPrepareTypes(params.getTypeOIDs());
          }

          if (describeOnly) {
            doneAfterRowDescNoData = true;
          } else {
            pendingDescribeStatementQueue.removeFirst();
          }
          break;
        }

        case '2': // Bind Complete (response to Bind)
          pgStream.receiveInteger4(); // len, discarded

          Portal boundPortal = pendingBindQueue.removeFirst();
          LOGGER.log(Level.FINEST, " <=BE BindComplete [{0}]", boundPortal);

          registerOpenPortal(boundPortal);
          break;

        case '3': // Close Complete (response to Close)
          pgStream.receiveInteger4(); // len, discarded
          LOGGER.log(Level.FINEST, " <=BE CloseComplete");
          break;

        case 'n': // No Data (response to Describe)
          pgStream.receiveInteger4(); // len, discarded
          LOGGER.log(Level.FINEST, " <=BE NoData");

          pendingDescribePortalQueue.removeFirst();

          if (doneAfterRowDescNoData) {
            DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst();
            SimpleQuery currentQuery = describeData.query;

            Field[] fields = currentQuery.getFields();

            if (fields != null) { // There was a resultset.
              tuples = new ArrayList<Tuple>();
              handler.handleResultRows(currentQuery, fields, tuples, null);
              tuples = null;
            }
          }
          break;

        case 's': { // Portal Suspended (end of Execute)
          // nb: this appears *instead* of CommandStatus.
          // Must be a SELECT if we suspended, so don't worry about it.

          pgStream.receiveInteger4(); // len, discarded
          LOGGER.log(Level.FINEST, " <=BE PortalSuspended");

          ExecuteRequest executeData = pendingExecuteQueue.removeFirst();
          SimpleQuery currentQuery = executeData.query;
          Portal currentPortal = executeData.portal;

          if (currentPortal != null) {
            // Existence of portal defines if query was using fetching.
            adaptiveFetchCache
              .updateQueryFetchSize(adaptiveFetch, currentQuery, pgStream.getMaxRowSizeBytes());
          }
          pgStream.clearMaxRowSizeBytes();

          Field[] fields = currentQuery.getFields();
          if (fields != null && tuples == null) {
            // When no results expected, pretend an empty resultset was returned
            // Not sure if new ArrayList can be always replaced with emptyList
            tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
          }

          if (fields != null && tuples != null) {
            handler.handleResultRows(currentQuery, fields, tuples, currentPortal);
          }
          tuples = null;
          break;
        }

        case 'C': { // Command Status (end of Execute)
          // Handle status.
          String status = receiveCommandStatus();
          if (isFlushCacheOnDeallocate()
              && (status.startsWith("DEALLOCATE ALL") || status.startsWith("DISCARD ALL"))) {
            deallocateEpoch++;
          }

          doneAfterRowDescNoData = false;

          ExecuteRequest executeData = castNonNull(pendingExecuteQueue.peekFirst());
          SimpleQuery currentQuery = executeData.query;
          Portal currentPortal = executeData.portal;

          if (currentPortal != null) {
            // Existence of portal defines if query was using fetching.

            // Command executed, adaptive fetch size can be removed for this query, max row size can be cleared
            adaptiveFetchCache.removeQuery(adaptiveFetch, currentQuery);
            // Update to change fetch size for other fetch portals of this query
            adaptiveFetchCache
                .updateQueryFetchSize(adaptiveFetch, currentQuery, pgStream.getMaxRowSizeBytes());
          }
          pgStream.clearMaxRowSizeBytes();

          if (status.startsWith("SET")) {
            String nativeSql = currentQuery.getNativeQuery().nativeSql;
            // Scan only the first 1024 characters to
            // avoid big overhead for long queries.
            if (nativeSql.lastIndexOf("search_path", 1024) != -1
                && !nativeSql.equals(lastSetSearchPathQuery)) {
              // Search path was changed, invalidate prepared statement cache
              lastSetSearchPathQuery = nativeSql;
              deallocateEpoch++;
            }
          }

          if (!executeData.asSimple) {
            pendingExecuteQueue.removeFirst();
          } else {
            // For simple 'Q' queries, executeQueue is cleared via ReadyForQuery message
          }

          // we want to make sure we do not add any results from these queries to the result set
          if (currentQuery == autoSaveQuery
              || currentQuery == releaseAutoSave) {
            // ignore "SAVEPOINT" or RELEASE SAVEPOINT status from autosave query
            break;
          }

          Field[] fields = currentQuery.getFields();
          if (fields != null && tuples == null) {
            // When no results expected, pretend an empty resultset was returned
            // Not sure if new ArrayList can be always replaced with emptyList
            tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
          }

          // If we received tuples we must know the structure of the
          // resultset, otherwise we won't be able to fetch columns
          // from it, etc, later.
          if (fields == null && tuples != null) {
            throw new IllegalStateException(
                "Received resultset tuples, but no field structure for them");
          }

          if (fields != null && tuples != null) {
            // There was a resultset.
            handler.handleResultRows(currentQuery, fields, tuples, null);
            tuples = null;

            if (bothRowsAndStatus) {
              interpretCommandStatus(status, handler);
            }
          } else {
            interpretCommandStatus(status, handler);
          }

          if (executeData.asSimple) {
            // Simple queries might return several resultsets, thus we clear
            // fields, so queries like "select 1;update; select2" will properly
            // identify that "update" did not return any results
            currentQuery.setFields(null);
          }

          if (currentPortal != null) {
            currentPortal.close();
          }
          break;
        }

        case 'D': // Data Transfer (ongoing Execute response)
          Tuple tuple = null;
          try {
            tuple = pgStream.receiveTupleV3();
          } catch (OutOfMemoryError oome) {
            if (!noResults) {
              handler.handleError(
                  new PSQLException(GT.tr("Ran out of memory retrieving query results."),
                      PSQLState.OUT_OF_MEMORY, oome));
            }
          } catch (SQLException e) {
            handler.handleError(e);
          }
          if (!noResults) {
            if (tuples == null) {
              tuples = new ArrayList<Tuple>();
            }
            if (tuple != null) {
              tuples.add(tuple);
            }
          }

          if (LOGGER.isLoggable(Level.FINEST)) {
            int length;
            if (tuple == null) {
              length = -1;
            } else {
              length = tuple.length();
            }
            LOGGER.log(Level.FINEST, " <=BE DataRow(len={0})", length);
          }

          break;

        case 'E':
          // Error Response (response to pretty much everything; backend then skips until Sync)
          SQLException error = receiveErrorResponse();
          handler.handleError(error);
          if (willHealViaReparse(error)) {
            // prepared statement ... is not valid kind of error
            // Technically speaking, the error is unexpected, thus we invalidate other
            // server-prepared statements just in case.
            deallocateEpoch++;
            if (LOGGER.isLoggable(Level.FINEST)) {
              LOGGER.log(Level.FINEST, " FE: received {0}, will invalidate statements. deallocateEpoch is now {1}",
                  new Object[]{error.getSQLState(), deallocateEpoch});
            }
          }
          // keep processing
          break;

        case 'I': { // Empty Query (end of Execute)
          pgStream.receiveInteger4();

          LOGGER.log(Level.FINEST, " <=BE EmptyQuery");

          ExecuteRequest executeData = pendingExecuteQueue.removeFirst();
          Portal currentPortal = executeData.portal;
          handler.handleCommandStatus("EMPTY", 0, 0);
          if (currentPortal != null) {
            currentPortal.close();
          }
          break;
        }

        case 'N': // Notice Response
          SQLWarning warning = receiveNoticeResponse();
          handler.handleWarning(warning);
          break;

        case 'S': // Parameter Status
          try {
            receiveParameterStatus();
          } catch (SQLException e) {
            handler.handleError(e);
            endQuery = true;
          }
          break;

        case 'T': // Row Description (response to Describe)
          Field[] fields = receiveFields();
          tuples = new ArrayList<Tuple>();

          SimpleQuery query = castNonNull(pendingDescribePortalQueue.peekFirst());
          if (!pendingExecuteQueue.isEmpty()
              && !castNonNull(pendingExecuteQueue.peekFirst()).asSimple) {
            pendingDescribePortalQueue.removeFirst();
          }
          query.setFields(fields);

          if (doneAfterRowDescNoData) {
            DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst();
            SimpleQuery currentQuery = describeData.query;
            currentQuery.setFields(fields);

            handler.handleResultRows(currentQuery, fields, tuples, null);
            tuples = null;
          }
          break;

        case 'Z': // Ready For Query (eventual response to Sync)
          receiveRFQ();
          if (!pendingExecuteQueue.isEmpty()
              && castNonNull(pendingExecuteQueue.peekFirst()).asSimple) {
            tuples = null;
            pgStream.clearResultBufferCount();

            ExecuteRequest executeRequest = pendingExecuteQueue.removeFirst();
            // Simple queries might return several resultsets, thus we clear
            // fields, so queries like "select 1;update; select2" will properly
            // identify that "update" did not return any results
            executeRequest.query.setFields(null);

            pendingDescribePortalQueue.removeFirst();
            if (!pendingExecuteQueue.isEmpty()) {
              if (getTransactionState() == TransactionState.IDLE) {
                handler.secureProgress();
              }
              // process subsequent results (e.g. for cases like batched execution of simple 'Q' queries)
              break;
            }
          }
          endQuery = true;

          // Reset the statement name of Parses that failed.
          while (!pendingParseQueue.isEmpty()) {
            SimpleQuery failedQuery = pendingParseQueue.removeFirst();
            failedQuery.unprepare();
          }

          pendingParseQueue.clear(); // No more ParseComplete messages expected.
          // Pending "describe" requests might be there in case of error
          // If that is the case, reset "described" status, so the statement is properly
          // described on next execution
          while (!pendingDescribeStatementQueue.isEmpty()) {
            DescribeRequest request = pendingDescribeStatementQueue.removeFirst();
            LOGGER.log(Level.FINEST, " FE marking setStatementDescribed(false) for query {0}", request.query);
            request.query.setStatementDescribed(false);
          }
          while (!pendingDescribePortalQueue.isEmpty()) {
            SimpleQuery describePortalQuery = pendingDescribePortalQueue.removeFirst();
            LOGGER.log(Level.FINEST, " FE marking setPortalDescribed(false) for query {0}", describePortalQuery);
            describePortalQuery.setPortalDescribed(false);
          }
          pendingBindQueue.clear(); // No more BindComplete messages expected.
          pendingExecuteQueue.clear(); // No more query executions expected.
          break;

        case 'G': // CopyInResponse
          LOGGER.log(Level.FINEST, " <=BE CopyInResponse");
          LOGGER.log(Level.FINEST, " FE=> CopyFail");

          // COPY sub-protocol is not implemented yet
          // We'll send a CopyFail message for COPY FROM STDIN so that
          // server does not wait for the data.

          byte[] buf = "COPY commands are only supported using the CopyManager API.".getBytes(StandardCharsets.US_ASCII);
          pgStream.sendChar('f');
          pgStream.sendInteger4(buf.length + 4 + 1);
          pgStream.send(buf);
          pgStream.sendChar(0);
          pgStream.flush();
          sendSync(); // send sync message
          skipMessage(); // skip the response message
          break;

        case 'H': // CopyOutResponse
          LOGGER.log(Level.FINEST, " <=BE CopyOutResponse");

          skipMessage();
          // In case of CopyOutResponse, we cannot abort data transfer,
          // so just throw an error and ignore CopyData messages
          handler.handleError(
              new PSQLException(GT.tr("COPY commands are only supported using the CopyManager API."),
                  PSQLState.NOT_IMPLEMENTED));
          break;

        case 'c': // CopyDone
          skipMessage();
          LOGGER.log(Level.FINEST, " <=BE CopyDone");
          break;

        case 'd': // CopyData
          skipMessage();
          LOGGER.log(Level.FINEST, " <=BE CopyData");
          break;

        default:
          throw new IOException("Unexpected packet type: " + c);
      }

    }
  }