CopyOperationImpl processCopyResults()

in src/main/java/com/amazon/redshift/core/v3/CopyQueryExecutor.java [159:397]


  CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
      throws SQLException, IOException {

    /*
    * fixes issue #1592 where one thread closes the stream and another is reading it
     */
    if (pgStream.isClosed()) {
      throw new RedshiftException(GT.tr("RedshiftStream is closed",
        op.getClass().getName()), RedshiftState.CONNECTION_DOES_NOT_EXIST);
    }
    /*
    *  This is a hack as we should not end up here, but sometimes do with large copy operations.
     */
    if ( processingCopyResults.compareAndSet(false,true) == false ) {
    	if(RedshiftLogger.isEnable())    	
    		logger.log(LogLevel.INFO, "Ignoring request to process copy results, already processing");
      return null;
    }

    // put this all in a try, finally block and reset the processingCopyResults in the finally clause
    try {
      boolean endReceiving = false;
      SQLException error = null;
      SQLException errors = null;
      int len;

      while (!endReceiving && (block || pgStream.hasMessagePending())) {

        // There is a bug in the server's implementation of the copy
        // protocol. It returns command complete immediately upon
        // receiving the EOF marker in the binary protocol,
        // potentially before we've issued CopyDone. When we are not
        // blocking, we don't think we are done, so we hold off on
        // processing command complete and any subsequent messages
        // until we actually are done with the copy.
        //
        if (!block) {
          int c = pgStream.peekChar();
          if (c == 'C') {
            // CommandComplete
          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE CommandStatus, Ignored until CopyDone");
            break;
          }
        }

        int c = pgStream.receiveChar();
        switch (c) {

          case 'A': // Asynchronous Notify

          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE Asynchronous Notification while copying");

          	queryExecutor.receiveAsyncNotify();
            break;

          case 'N': // Notice Response
          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE Notification while copying");

          	queryExecutor.addWarning(queryExecutor.receiveNoticeResponse());
            break;

          case 'C': // Command Complete

            String status = queryExecutor.receiveCommandStatus();

            try {
              if (op == null) {
                throw new RedshiftException(GT
                    .tr("Received CommandComplete ''{0}'' without an active copy operation", status),
                    RedshiftState.OBJECT_NOT_IN_STATE);
              }
              op.handleCommandStatus(status);
            } catch (SQLException se) {
              error = se;
            }

            block = true;
            break;

          case 'E': // ErrorMessage (expected response to CopyFail)

            error = queryExecutor.receiveErrorResponse(false);
            // We've received the error and we now expect to receive
            // Ready for query, but we must block because it might still be
            // on the wire and not here yet.
            block = true;
            break;

          case 'G': // CopyInResponse

          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE CopyInResponse");

            if (op != null) {
              error = new RedshiftException(GT.tr("Got CopyInResponse from server during an active {0}",
                  op.getClass().getName()), RedshiftState.OBJECT_NOT_IN_STATE);
            }

            op = new CopyInImpl();
            initCopy(op);
            endReceiving = true;
            break;

          case 'H': // CopyOutResponse

          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE CopyOutResponse");

            if (op != null) {
              error = new RedshiftException(GT.tr("Got CopyOutResponse from server during an active {0}",
                  op.getClass().getName()), RedshiftState.OBJECT_NOT_IN_STATE);
            }

            op = new CopyOutImpl();
            initCopy(op);
            endReceiving = true;
            break;

          case 'W': // CopyBothResponse

          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE CopyBothResponse");

            if (op != null) {
              error = new RedshiftException(GT.tr("Got CopyBothResponse from server during an active {0}",
                  op.getClass().getName()), RedshiftState.OBJECT_NOT_IN_STATE);
            }

            op = new CopyDualImpl();
            initCopy(op);
            endReceiving = true;
            break;

          case 'd': // CopyData

          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE CopyData");

            len = pgStream.receiveInteger4() - 4;

            assert len > 0 : "Copy Data length must be greater than 4";

            byte[] buf = pgStream.receive(len);
            if (op == null) {
              error = new RedshiftException(GT.tr("Got CopyData without an active copy operation"),
                  RedshiftState.OBJECT_NOT_IN_STATE);
            } else if (!(op instanceof CopyOut)) {
              error = new RedshiftException(
                  GT.tr("Unexpected copydata from server for {0}", op.getClass().getName()),
                  RedshiftState.COMMUNICATION_ERROR);
            } else {
              op.handleCopydata(buf);
            }
            endReceiving = true;
            break;

          case 'c': // CopyDone (expected after all copydata received)

          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE CopyDone");

            len = pgStream.receiveInteger4() - 4;
            if (len > 0) {
              pgStream.receive(len); // not in specification; should never appear
            }

            if (!(op instanceof CopyOut)) {
              error = new RedshiftException("Got CopyDone while not copying from server",
                  RedshiftState.OBJECT_NOT_IN_STATE);
            }

            // keep receiving since we expect a CommandComplete
            block = true;
            break;
          case 'S': // Parameter Status
            try {
            	queryExecutor.receiveParameterStatus();
            } catch (SQLException e) {
              error = e;
              endReceiving = true;
            }
            break;

          case 'Z': // ReadyForQuery: After FE:CopyDone => BE:CommandComplete

          	queryExecutor.receiveRFQ();
            if (queryExecutor.hasLock(op)) {
            	queryExecutor.unlock(op);
            }
            op = null;
            endReceiving = true;
            break;

          // If the user sends a non-copy query, we've got to handle some additional things.
          //
          case 'T': // Row Description (response to Describe)
          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE RowDescription (during copy ignored)");

          	queryExecutor.skipMessage();
            break;

          case 'D': // DataRow
          	if(RedshiftLogger.isEnable())    	
          		logger.log(LogLevel.DEBUG, " <=BE DataRow (during copy ignored)");

          	queryExecutor.skipMessage();
            break;

          default:
            throw new IOException(
                GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
        }

        // Collect errors into a neat chain for completeness
        if (error != null) {
          if (errors != null) {
            error.setNextException(errors);
          }
          errors = error;
          error = null;
        }
      }

      if (errors != null) {
        throw errors;
      }
      return op;

    } finally {
      /*
      reset here in the finally block to make sure it really is cleared
       */
      processingCopyResults.set(false);
    }
  }