void _pqParseInput3()

in src/pgclient/src/interfaces/libpq/fe-protocol3.c [90:843]


void _pqParseInput3(void *_pCscStatementContext,int *piStop)
{
    struct _CscStatementContext *pCscStatementContext = (struct _CscStatementContext *)_pCscStatementContext;
	char		id;
	int			msgLength;
	int			avail;
	int			go_return = 0;
    PGconn *conn = pCscStatementContext->m_pgConn;
    MessageLoopState *pMessageLoopState = pCscStatementContext->m_pMessageLoopState;
    ClientSideCursorResult *pCscResult = (pMessageLoopState) ? pMessageLoopState->m_cscResult : NULL;
    struct _ClientSideCursorExecutor *pCscExecutor = conn->m_pCscExecutor;
    int cscStopThread;
	int iStreamingCursorMode = (pCscStatementContext && isStreamingCursorMode(pCscStatementContext->m_pCallerContext));
	int iStreamingCursorRows = (iStreamingCursorMode) 
									? pCscStatementContext->m_StreamingCursorInfo.m_streamingCursorRows
									: 0;
    

	/*
	 * Loop to parse successive complete messages available in the buffer.
	 */
	for (;;)
	{
        if(pMessageLoopState &&  pCscExecutor)
            cscStopThread = getCscStopThreadFlag(pCscExecutor);
        else
            cscStopThread = FALSE;

        if(pCscStatementContext && pCscStatementContext->m_calledFromCscThread && cscStopThread)
//               ( cscStopThread|| (pMessageLoopState && pMessageLoopState->endQuery))
        {
                break;
        }

		/*
		 * Read the buffered compressed data w/o blocking
		 */
		if (conn->zpqStream && pqReadPending(conn) && (conn->inBufSize - conn->inEnd > 0))
		{
			int rc = zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd, conn->inBufSize - conn->inEnd, true);

			if (rc > 0)
			{
				conn->inEnd += rc;
			}
		}
		/*
		 * Try to read a message.  First get the type code and length. Return
		 * if not enough data.
		 */
		conn->inCursor = conn->inStart;
		if (pqGetc(&id, conn))
			return;
		if (pqGetInt(&msgLength, 4, conn))
			return;

		/*
		 * Try to validate message type/length here.  A length less than 4 is
		 * definitely broken.  Large lengths should only be believed for a few
		 * message types.
		 */
		if (msgLength < 4)
		{
			handleSyncLoss(conn, id, msgLength);
			return;
		}
		if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id))
		{
			handleSyncLoss(conn, id, msgLength);
			return;
		}

		/*
		 * Can't process if message body isn't all here yet.
		 */
		msgLength -= 4;
		avail = conn->inEnd - conn->inCursor;
		if ( id == 'Z' && avail <= 1)
		  conn->lastseg = true;
		if (avail < msgLength)
		{
			/*
			 * Before returning, enlarge the input buffer if needed to hold
			 * the whole message.  This is better than leaving it to
			 * pqReadData because we can avoid multiple cycles of realloc()
			 * when the message is large; also, we can implement a reasonable
			 * recovery strategy if we are unable to make the buffer big
			 * enough.
			 */
			if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength,
									 conn))
			{
				/*
				 * XXX add some better recovery code... plan is to skip over
				 * the message using its length, then report an error. For the
				 * moment, just treat this like loss of sync (which indeed it
				 * might be!)
				 */
				handleSyncLoss(conn, id, msgLength);
			}
			return;
		}

		/*
		 * NOTIFY and NOTICE messages can happen in any state; always process
		 * them right away.
		 *
		 * Most other messages should only be processed while in BUSY state.
		 * (In particular, in READY state we hold off further parsing until
		 * the application collects the current PGresult.)
		 *
		 * However, if the state is IDLE then we got trouble; we need to deal
		 * with the unexpected message somehow.
		 *
		 * ParameterStatus ('S') messages are a special case: in IDLE state we
		 * must process 'em (this case could happen if a new value was adopted
		 * from config file due to SIGHUP), but otherwise we hold off until
		 * BUSY state.
		 */
		if (id == 'A')
		{
			if (getNotify(conn))
				return;
		}
		else if (id == 'N')
		{
			if (pqGetErrorNotice3(conn, false))
				return;
		}
		else if (conn->asyncStatus != PGASYNC_BUSY)
		{
			/* If not IDLE state, just wait ... */
			if (conn->asyncStatus != PGASYNC_IDLE)
				return;

			/*
			 * Unexpected message in IDLE state; need to recover somehow.
			 * ERROR messages are displayed using the notice processor;
			 * ParameterStatus is handled normally; anything else is just
			 * dropped on the floor after displaying a suitable warning
			 * notice.	(An ERROR is very possibly the backend telling us why
			 * it is about to close the connection, so we don't want to just
			 * discard it...)
			 */
			if (id == 'E')
			{
				if (_pqGetErrorNotice3(conn, false /* treat as notice */ , _pCscStatementContext))
					return;
			}
			else if (id == 'S')
			{
				if (getParameterStatus(conn))
					return;
			}
			else
			{
				pqInternalNotice(&conn->noticeHooks,
						"message type 0x%02x arrived from server while idle",
								 id);
				/* Discard the unexpected message */
				conn->inCursor += msgLength;
			}
		}
		else
		{
			/*
			 * In BUSY state, we can process everything.
			 */
			switch (id)
			{
				case 'C':		/* command complete */
					if (pqGets(&conn->workBuffer, conn))
						return;
					if (conn->result == NULL)
					{
						conn->result = PQmakeEmptyPGresult(conn,
														   PGRES_COMMAND_OK);
						if (!conn->result)
							return;
					}
					strncpy(conn->result->cmdStatus, conn->workBuffer.data,
							CMDSTATUS_LEN);
					conn->asyncStatus = PGASYNC_READY;

                    // Return result with CSC
                    if(pMessageLoopState &&  pCscExecutor)
                    {
                        if(pMessageLoopState->pgResult == NULL)
                            pMessageLoopState->pgResult = conn->result;

                        if(pCscStatementContext->m_calledFromCscThread)
                        {
                            // We need to make it null because, we already pass the result back to caller.
                            conn->result = NULL;

			                /* Set the state back to BUSY, allowing parsing to proceed. */
                            conn->asyncStatus = PGASYNC_BUSY;
                        }

                        if((pMessageLoopState->pgResult != NULL 
                             && pMessageLoopState->pgResult->tuples != NULL)
                             || pMessageLoopState->rowsInMemReturned)
                        { // There was a resultset.
                	        if(!(pMessageLoopState->rowsInMemReturned))
                	        {
                                if(pMessageLoopState->pgResult)
                                    pMessageLoopState->pgResult->m_cscResult = pCscResult;

                                if(pCscStatementContext->m_pResultHandlerCallbackFunc
                                    && pCscStatementContext->m_calledFromCscThread)
                                {
                                    // Return the rows
                                    (*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
                                                                                            pMessageLoopState->pgResult);
                                }
					        }

                            pMessageLoopState->executeIndex++;
                	        pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex, NULL);
                        }
                        else
                        if(pMessageLoopState->pgResult != NULL 
                                && pMessageLoopState->pgResult->resultStatus == PGRES_COMMAND_OK)
                        {

                            if(pCscStatementContext->m_pResultHandlerCallbackFunc
                                && pCscStatementContext->m_calledFromCscThread)
                            {
                                // Return the command status for non-result commands.
                                (*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
                                                                                        pMessageLoopState->pgResult);
                            }

                            pMessageLoopState->executeIndex++;
                	        pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex, NULL);
                        }
                    }
					else
					if(iStreamingCursorMode)
					{
						resetAfterOneResultReadFromServerFinishForStreamingCursor(pCscStatementContext, conn);
					}

					break;
				case 'E':		/* error return */
					if (_pqGetErrorNotice3(conn, true, _pCscStatementContext))
						return;
					conn->asyncStatus = PGASYNC_READY;
					break;
				case 'Z':		/* backend is ready for new query */
					if (getReadyForQuery(conn))
						return;
					conn->asyncStatus = PGASYNC_IDLE;

                    if(pCscExecutor && pMessageLoopState)
                    {
                        endOfQueryResponseCsc(pCscExecutor, pMessageLoopState);
                        pCscResult = NULL;
                    }
					else
					if(iStreamingCursorMode)
					{
						pCscStatementContext->m_StreamingCursorInfo.m_endOfStreamingCursorQuery = TRUE;
					}

					break;
				case 'I':		/* empty query */
					if (conn->result == NULL)
					{
						conn->result = PQmakeEmptyPGresult(conn,
														   PGRES_EMPTY_QUERY);
						if (!conn->result)
							return;
					}
					conn->asyncStatus = PGASYNC_READY;
					break;
				case '1':		/* Parse Complete */
					/* If we're doing PQprepare, we're done; else ignore */
					if (conn->queryclass == PGQUERY_PREPARE)
					{
						if (conn->result == NULL)
						{
							conn->result = PQmakeEmptyPGresult(conn,
														   PGRES_COMMAND_OK);
							if (!conn->result)
								return;
						}
						conn->asyncStatus = PGASYNC_READY;
					}
					break;
				case '2':		/* Bind Complete */
				case '3':		/* Close Complete */
					/* Nothing to do for these message types */
					break;
				case 'S':		/* parameter status */
					if (getParameterStatus(conn))
						return;
					break;
				case 'K':		/* secret key data from the backend */

					/*
					 * This is expected only during backend startup, but it's
					 * just as easy to handle it as part of the main loop.
					 * Save the data and continue processing.
					 */
					if (pqGetInt(&(conn->be_pid), 4, conn))
						return;
					if (pqGetInt(&(conn->be_key), 4, conn))
						return;
					break;
				case 'T':		/* Row Description */
					if (conn->result == NULL ||
						conn->queryclass == PGQUERY_DESCRIBE)
					{
						if(iStreamingCursorMode)
						{
							// We need to make it null because, we already pass the result back to caller.
							if(pCscStatementContext->m_StreamingCursorInfo.m_streamResultBatchNumber > 0)
							{
								// Indicate end of the rows
								pCscStatementContext->m_StreamingCursorInfo.m_endOfStreamingCursor = TRUE;

								// Reset after one result read
								conn->result = NULL;
							}
						}

						/* First 'T' in a query sequence */
						if (getRowDescriptions(conn))
							return;

						/*
						 * If we're doing a Describe, we're ready to pass the
						 * result back to the client.
						 */
						if (conn->queryclass == PGQUERY_DESCRIBE)
							conn->asyncStatus = PGASYNC_READY;

                        if(pMessageLoopState &&  pCscExecutor)
                        {
                            if(pMessageLoopState->pgResult == NULL)
                                pMessageLoopState->pgResult = conn->result;

                            if((pMessageLoopState->pgResult != NULL 
                                 && pMessageLoopState->pgResult->tuples != NULL)
                                 || pMessageLoopState->rowsInMemReturned)
                            { // There was a resultset.
                    	        if(!(pMessageLoopState->rowsInMemReturned))
                    	        {
                                    if(pMessageLoopState->pgResult)
                                        pMessageLoopState->pgResult->m_cscResult = pCscResult;

                                    if(pCscStatementContext->m_pResultHandlerCallbackFunc
                                        && pCscStatementContext->m_calledFromCscThread)
                                    {
                                        // Return the rows
                                        (*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
                                                                                                pMessageLoopState->pgResult);
                                    }
						        }

                    	        pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex + 1, NULL);
                            }
                        }
					}
					else
					{
						/*
						 * A new 'T' message is treated as the start of
						 * another PGresult.  (It is not clear that this is
						 * really possible with the current backend.) We stop
						 * parsing until the application accepts the current
						 * result.
						 */
						conn->asyncStatus = PGASYNC_READY;
						return;
					}
					break;
				case 'n':		/* No Data */

					/*
					 * NoData indicates that we will not be seeing a
					 * RowDescription message because the statement or portal
					 * inquired about doesn't return rows.
					 *
					 * If we're doing a Describe, we have to pass something
					 * back to the client, so set up a COMMAND_OK result,
					 * instead of TUPLES_OK.  Otherwise we can just ignore
					 * this message.
					 */
					if (conn->queryclass == PGQUERY_DESCRIBE)
					{
						if(iStreamingCursorMode)
						{
							resetAfterOneResultReadFromServerFinishForStreamingCursor(pCscStatementContext, conn);
						}

						if (conn->result == NULL)
						{
							conn->result = PQmakeEmptyPGresult(conn,
														   PGRES_COMMAND_OK);
							if (!conn->result)
								return;
						}

                        if(pMessageLoopState &&  pCscExecutor)
                        {
                            if(pMessageLoopState->pgResult == NULL)
                                pMessageLoopState->pgResult = conn->result;

                            if(pCscStatementContext->m_calledFromCscThread)
                            {
                                // We need to make it null because, we already pass the result back to caller.
                                conn->result = NULL;
                            } 

                            if((pMessageLoopState->pgResult != NULL 
                                 && pMessageLoopState->pgResult->tuples != NULL)
                                 || pMessageLoopState->rowsInMemReturned)
                            { // There was a resultset.
                    	        if(!(pMessageLoopState->rowsInMemReturned))
                    	        {
                                    if(pMessageLoopState->pgResult)
                                        pMessageLoopState->pgResult->m_cscResult = pCscResult;

                                    if(pCscStatementContext->m_pResultHandlerCallbackFunc
                                        && pCscStatementContext->m_calledFromCscThread)
                                    {
                                        // Return the rows
                                        (*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
                                                                                                pMessageLoopState->pgResult);
                                    }
						        }

                                pMessageLoopState->executeIndex++;
                    	        pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex, NULL);
                            }
                        }

						conn->asyncStatus = PGASYNC_READY;
					}
					break;
				case 't':		/* Parameter Description */
					if (getParamDescriptions(conn))
						return;
					break;

				case 'D':		/* Data Row */
                {
            	    int skipRow = FALSE; 
            	    int createThread = FALSE;
                    PGresAttValue *tuple = NULL;

                    // Save result block status before reading row. 
                    // We have to free and restore status after writing row to disk.
                    PGresAttValue *_curTuple = NULL;
	                PGresult_data *_curBlock = NULL;	/* most recently allocated block */
	                PGresult_data *_nextBlock = NULL;	/* most recently allocated block */
	                int			_curOffset = 0;		/* start offset of free space in block */
	                int			_spaceLeft = 0;		/* number of free bytes remaining in block */
                    int         iSavedResultBlockStatus = FALSE; // Just indicate whether we save the status or not.
                    PGresult   *result; 

					if (conn->result != NULL &&
						conn->result->resultStatus == PGRES_TUPLES_OK)
					{
	        		    if(pCscResult != NULL && pMessageLoopState)
	        		    {
	        			    skipRow = pMessageLoopState->m_skipAllResults
	    						    || ((pMessageLoopState->m_skipResultNumber != 0) 
                                                && (pMessageLoopState->m_skipResultNumber == (pMessageLoopState->executeIndex + 1)))
	    						    || isSkipResultCsc(pCscResult)
	    						    || doesMaxRowsReachCsc(pCscResult);
    	        			
	        			    if(skipRow)
	        			    {
                                if(IS_TRACE_ON_CSC())
	                            {
	                                traceInfoCsc("skipRow is true. msgLoopState.m_skipAllResults = %d ((msgLoopState.m_skipResultNumber != 0) && (msgLoopState.m_skipResultNumber == (msgLoopState.executeIndex + 1))) = %d cscResult.isSkipResult() = %d cscResult.doesMaxRowsReach() = %d",
                                                        pMessageLoopState->m_skipAllResults,
                                                        ((pMessageLoopState->m_skipResultNumber != 0) && (pMessageLoopState->m_skipResultNumber == (pMessageLoopState->executeIndex + 1))) ,
                                                        isSkipResultCsc(pCscResult),
                                                        doesMaxRowsReachCsc(pCscResult));
	                            }
	        			    }  
	        		    }
						else
						if(iStreamingCursorMode)
						{
							skipRow = pCscStatementContext->m_StreamingCursorInfo.m_skipStreamingCursor;
						}

                	    if (!skipRow)
                        {
                            if(pCscResult && doesThresholdReachCsc(pCscResult))
                            {
                                // Save the result block status
                                result = conn->result;
                                _curTuple = conn->curTuple;
                                _curBlock = result->curBlock;
                                _curOffset = result->curOffset;
                                _spaceLeft = result->spaceLeft;
                                _nextBlock = result->curBlock->next;
                                iSavedResultBlockStatus = TRUE;
                            }

						    /* Read another tuple of a normal query response */
                            if (getAnotherTuple(conn, msgLength,(pCscResult) ? &tuple : NULL, iStreamingCursorRows))
							    return;
                        }
                        else
                        {
						    /* Skip another tuple of a normal query response */
						    if (skipAnotherTuple(conn, msgLength))
							    return;
                        }

						if (conn->output_nbytes && (conn->result->mem_used >= conn->output_nbytes)) 
						{
						  go_return = 1;
						  break;
						}

                        if(!skipRow)
                        {
                            if(pCscResult != NULL && pMessageLoopState)
                            {
                    	        // Add row count
                    	        incRowCountAndCheckForMaxRowsCsc(pCscResult);

                                setRawRowLengthCsc(pCscResult, msgLength);
                            	
	                            if(!doesThresholdReachCsc(pCscResult))
	                            {
	                    	        int reachThresholdLimit = checkForThresholdLimitReachCsc(pCscResult,0, getRawRowLengthCsc(pCscResult), TRUE);
        	                    	
	                    	        if(reachThresholdLimit)
	                    	        {
	                    		        // Store existing rows in tuples on disk
	                    		        createFileCsc(pCscResult, conn->result);

	                    		        // Now onward store rows on disk without doing calculation because threshold limit is reached.
	                    		        setThresholdReachCsc(pCscResult, TRUE);
        	                    		
	                    		        // Create new thread to read rest of rows from socket
	                    		        createThread = pMessageLoopState->multiThreads;
                			        }
	                            }
                            }

                            if(pCscResult != NULL && doesThresholdReachCsc(pCscResult))
                            {
                		        // Store current row on disk
                    	        writeRowCsc(pCscResult, tuple, conn->result->numAttributes);

                                // Free the tuple, by restoring block status before the row read.
                                if(iSavedResultBlockStatus)
                                {
                                    PGresult_data *block;

                                    iSavedResultBlockStatus = FALSE;

                                    // Restore the result block status
                                    result = conn->result;

                                    conn->curTuple = _curTuple;

                                    // Free any new block(s) added in front
                                    while ((block = result->curBlock) != _curBlock)
                                    {
                                        if(block)
                                        {
	                                        result->curBlock = block->next;
	                                        free(block);
                                        }
                                        else
                                            break;
                                    }

                                    if(result->curBlock)
                                    {
                                        // Free any new block(s) added at next
                                        while ((block = result->curBlock->next) != _nextBlock)
                                        {
                                            if(block)
                                            {
	                                            result->curBlock->next = block->next;
	                                            free(block);
                                            }
                                            else
                                                break;
                                        }
                                    }

                                    // Restore cur block status
                                    result->curOffset = _curOffset;
                                    result->spaceLeft = _spaceLeft;
                                }
                            }
                            else
                            {
                		        // Store current row in memory
                                if(tuple)
								{
                                    addAnotherTuple(conn, msgLength, tuple, 0);
								}
								else
								{
									// Is streaming cursor
									if(iStreamingCursorMode)
									{
										if(conn->result 
											&& 	conn->result->totalntups == pCscStatementContext->m_StreamingCursorInfo.m_streamingCursorRows)
										{
											/* Normal case: parsing agrees with specified length */
											conn->inStart = conn->inCursor;

                    						// Return to break the loop
											if(piStop)
												*piStop = TRUE;

											(pCscStatementContext->m_StreamingCursorInfo.m_streamResultBatchNumber)++;

                    						return;
										}
									} // Streaming cursor
								}
                            }
                            
                        } // !skip

                        if(createThread)
                        {
                            int threadCreated;
                            int savInStart = conn->inStart;
                            
                            pCscStatementContext->m_cscThreadCreated = TRUE;

		                    /* Normal case: parsing agrees with specified length */
		                    conn->inStart = conn->inCursor;
                            
                            threadCreated = createProcessingThreadCsc(pCscExecutor, pMessageLoopState, pCscResult,
                                                                            conn->result, pCscStatementContext);

                            if(threadCreated)
                            {
                    	        // Return to break the loop

                                if(piStop)
                                    *piStop = TRUE;

                    	        return;
                            }
                            else
                            {
                                conn->inStart = savInStart;
                                pCscStatementContext->m_cscThreadCreated = FALSE;
                            }
                        	
                        } // Create thread
					}
					else if (conn->result != NULL &&
							 conn->result->resultStatus == PGRES_FATAL_ERROR)
					{
						/*
						 * We've already choked for some reason.  Just discard
						 * tuples till we get to the end of the query.
						 */
						conn->inCursor += msgLength;
					}
					else
					{
						/* Set up to report error at end of query */
						printfPQExpBuffer(&conn->errorMessage,
										  libpq_gettext("server sent data (\"D\" message) without prior row description (\"T\" message)\n"));
						pqSaveErrorResult(conn);
						/* Discard the unexpected message */
						conn->inCursor += msgLength;
					}
					break;
                }

				case 'G':		/* Start Copy In */
					if (getCopyStart(conn, PGRES_COPY_IN))
						return;
					conn->asyncStatus = PGASYNC_COPY_IN;
					break;
				case 'H':		/* Start Copy Out */
					if (getCopyStart(conn, PGRES_COPY_OUT))
						return;
					conn->asyncStatus = PGASYNC_COPY_OUT;
					conn->copy_already_done = 0;
					break;
				case 'W':		/* Start Copy Both */
					if (getCopyStart(conn, PGRES_COPY_BOTH))
						return;
					conn->asyncStatus = PGASYNC_COPY_BOTH;
					conn->copy_already_done = 0;
					break;
				case 'd':		/* Copy Data */

					/*
					 * If we see Copy Data, just silently drop it.	This would
					 * only occur if application exits COPY OUT mode too
					 * early.
					 */
					conn->inCursor += msgLength;
					break;
				case 'c':		/* Copy Done */

					/*
					 * If we see Copy Done, just silently drop it.	This is
					 * the normal case during PQendcopy.  We will keep
					 * swallowing data, expecting to see command-complete for
					 * the COPY command.
					 */
					break;
				default:
					printfPQExpBuffer(&conn->errorMessage,
									  libpq_gettext(
													"unexpected response from server; first received character was \"%c\"\n"),
									  id);
					/* build an error result holding the error message */
					pqSaveErrorResult(conn);
					/* not sure if we will see more, so go to ready state */
					conn->asyncStatus = PGASYNC_READY;
					/* Discard the unexpected message */
					conn->inCursor += msgLength;
					break;
			}					/* switch on protocol character */
		}
		/* Successfully consumed this message */
		if (conn->inCursor == conn->inStart + 5 + msgLength)
		{
			/* Normal case: parsing agrees with specified length */
			conn->inStart = conn->inCursor;
		}
		else
		{
			/* Trouble --- report it */
			printfPQExpBuffer(&conn->errorMessage,
							  libpq_gettext("message contents do not agree with length in message type \"%c\"\n"),
							  id);
			/* build an error result holding the error message */
			pqSaveErrorResult(conn);
			conn->asyncStatus = PGASYNC_READY;
			/* trust the specified message length as what to skip */
			conn->inStart += 5 + msgLength;
		}

		if (go_return)
		  return;
	} // Message loop
}