in src/pgclient/src/interfaces/libpq/fe-protocol3.c [90:843]
void _pqParseInput3(void *_pCscStatementContext,int *piStop)
{
struct _CscStatementContext *pCscStatementContext = (struct _CscStatementContext *)_pCscStatementContext;
char id;
int msgLength;
int avail;
int go_return = 0;
PGconn *conn = pCscStatementContext->m_pgConn;
MessageLoopState *pMessageLoopState = pCscStatementContext->m_pMessageLoopState;
ClientSideCursorResult *pCscResult = (pMessageLoopState) ? pMessageLoopState->m_cscResult : NULL;
struct _ClientSideCursorExecutor *pCscExecutor = conn->m_pCscExecutor;
int cscStopThread;
int iStreamingCursorMode = (pCscStatementContext && isStreamingCursorMode(pCscStatementContext->m_pCallerContext));
int iStreamingCursorRows = (iStreamingCursorMode)
? pCscStatementContext->m_StreamingCursorInfo.m_streamingCursorRows
: 0;
/*
* Loop to parse successive complete messages available in the buffer.
*/
for (;;)
{
if(pMessageLoopState && pCscExecutor)
cscStopThread = getCscStopThreadFlag(pCscExecutor);
else
cscStopThread = FALSE;
if(pCscStatementContext && pCscStatementContext->m_calledFromCscThread && cscStopThread)
// ( cscStopThread|| (pMessageLoopState && pMessageLoopState->endQuery))
{
break;
}
/*
* Read the buffered compressed data w/o blocking
*/
if (conn->zpqStream && pqReadPending(conn) && (conn->inBufSize - conn->inEnd > 0))
{
int rc = zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd, conn->inBufSize - conn->inEnd, true);
if (rc > 0)
{
conn->inEnd += rc;
}
}
/*
* Try to read a message. First get the type code and length. Return
* if not enough data.
*/
conn->inCursor = conn->inStart;
if (pqGetc(&id, conn))
return;
if (pqGetInt(&msgLength, 4, conn))
return;
/*
* Try to validate message type/length here. A length less than 4 is
* definitely broken. Large lengths should only be believed for a few
* message types.
*/
if (msgLength < 4)
{
handleSyncLoss(conn, id, msgLength);
return;
}
if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id))
{
handleSyncLoss(conn, id, msgLength);
return;
}
/*
* Can't process if message body isn't all here yet.
*/
msgLength -= 4;
avail = conn->inEnd - conn->inCursor;
if ( id == 'Z' && avail <= 1)
conn->lastseg = true;
if (avail < msgLength)
{
/*
* Before returning, enlarge the input buffer if needed to hold
* the whole message. This is better than leaving it to
* pqReadData because we can avoid multiple cycles of realloc()
* when the message is large; also, we can implement a reasonable
* recovery strategy if we are unable to make the buffer big
* enough.
*/
if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength,
conn))
{
/*
* XXX add some better recovery code... plan is to skip over
* the message using its length, then report an error. For the
* moment, just treat this like loss of sync (which indeed it
* might be!)
*/
handleSyncLoss(conn, id, msgLength);
}
return;
}
/*
* NOTIFY and NOTICE messages can happen in any state; always process
* them right away.
*
* Most other messages should only be processed while in BUSY state.
* (In particular, in READY state we hold off further parsing until
* the application collects the current PGresult.)
*
* However, if the state is IDLE then we got trouble; we need to deal
* with the unexpected message somehow.
*
* ParameterStatus ('S') messages are a special case: in IDLE state we
* must process 'em (this case could happen if a new value was adopted
* from config file due to SIGHUP), but otherwise we hold off until
* BUSY state.
*/
if (id == 'A')
{
if (getNotify(conn))
return;
}
else if (id == 'N')
{
if (pqGetErrorNotice3(conn, false))
return;
}
else if (conn->asyncStatus != PGASYNC_BUSY)
{
/* If not IDLE state, just wait ... */
if (conn->asyncStatus != PGASYNC_IDLE)
return;
/*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are displayed using the notice processor;
* ParameterStatus is handled normally; anything else is just
* dropped on the floor after displaying a suitable warning
* notice. (An ERROR is very possibly the backend telling us why
* it is about to close the connection, so we don't want to just
* discard it...)
*/
if (id == 'E')
{
if (_pqGetErrorNotice3(conn, false /* treat as notice */ , _pCscStatementContext))
return;
}
else if (id == 'S')
{
if (getParameterStatus(conn))
return;
}
else
{
pqInternalNotice(&conn->noticeHooks,
"message type 0x%02x arrived from server while idle",
id);
/* Discard the unexpected message */
conn->inCursor += msgLength;
}
}
else
{
/*
* In BUSY state, we can process everything.
*/
switch (id)
{
case 'C': /* command complete */
if (pqGets(&conn->workBuffer, conn))
return;
if (conn->result == NULL)
{
conn->result = PQmakeEmptyPGresult(conn,
PGRES_COMMAND_OK);
if (!conn->result)
return;
}
strncpy(conn->result->cmdStatus, conn->workBuffer.data,
CMDSTATUS_LEN);
conn->asyncStatus = PGASYNC_READY;
// Return result with CSC
if(pMessageLoopState && pCscExecutor)
{
if(pMessageLoopState->pgResult == NULL)
pMessageLoopState->pgResult = conn->result;
if(pCscStatementContext->m_calledFromCscThread)
{
// We need to make it null because, we already pass the result back to caller.
conn->result = NULL;
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
}
if((pMessageLoopState->pgResult != NULL
&& pMessageLoopState->pgResult->tuples != NULL)
|| pMessageLoopState->rowsInMemReturned)
{ // There was a resultset.
if(!(pMessageLoopState->rowsInMemReturned))
{
if(pMessageLoopState->pgResult)
pMessageLoopState->pgResult->m_cscResult = pCscResult;
if(pCscStatementContext->m_pResultHandlerCallbackFunc
&& pCscStatementContext->m_calledFromCscThread)
{
// Return the rows
(*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
pMessageLoopState->pgResult);
}
}
pMessageLoopState->executeIndex++;
pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex, NULL);
}
else
if(pMessageLoopState->pgResult != NULL
&& pMessageLoopState->pgResult->resultStatus == PGRES_COMMAND_OK)
{
if(pCscStatementContext->m_pResultHandlerCallbackFunc
&& pCscStatementContext->m_calledFromCscThread)
{
// Return the command status for non-result commands.
(*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
pMessageLoopState->pgResult);
}
pMessageLoopState->executeIndex++;
pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex, NULL);
}
}
else
if(iStreamingCursorMode)
{
resetAfterOneResultReadFromServerFinishForStreamingCursor(pCscStatementContext, conn);
}
break;
case 'E': /* error return */
if (_pqGetErrorNotice3(conn, true, _pCscStatementContext))
return;
conn->asyncStatus = PGASYNC_READY;
break;
case 'Z': /* backend is ready for new query */
if (getReadyForQuery(conn))
return;
conn->asyncStatus = PGASYNC_IDLE;
if(pCscExecutor && pMessageLoopState)
{
endOfQueryResponseCsc(pCscExecutor, pMessageLoopState);
pCscResult = NULL;
}
else
if(iStreamingCursorMode)
{
pCscStatementContext->m_StreamingCursorInfo.m_endOfStreamingCursorQuery = TRUE;
}
break;
case 'I': /* empty query */
if (conn->result == NULL)
{
conn->result = PQmakeEmptyPGresult(conn,
PGRES_EMPTY_QUERY);
if (!conn->result)
return;
}
conn->asyncStatus = PGASYNC_READY;
break;
case '1': /* Parse Complete */
/* If we're doing PQprepare, we're done; else ignore */
if (conn->queryclass == PGQUERY_PREPARE)
{
if (conn->result == NULL)
{
conn->result = PQmakeEmptyPGresult(conn,
PGRES_COMMAND_OK);
if (!conn->result)
return;
}
conn->asyncStatus = PGASYNC_READY;
}
break;
case '2': /* Bind Complete */
case '3': /* Close Complete */
/* Nothing to do for these message types */
break;
case 'S': /* parameter status */
if (getParameterStatus(conn))
return;
break;
case 'K': /* secret key data from the backend */
/*
* This is expected only during backend startup, but it's
* just as easy to handle it as part of the main loop.
* Save the data and continue processing.
*/
if (pqGetInt(&(conn->be_pid), 4, conn))
return;
if (pqGetInt(&(conn->be_key), 4, conn))
return;
break;
case 'T': /* Row Description */
if (conn->result == NULL ||
conn->queryclass == PGQUERY_DESCRIBE)
{
if(iStreamingCursorMode)
{
// We need to make it null because, we already pass the result back to caller.
if(pCscStatementContext->m_StreamingCursorInfo.m_streamResultBatchNumber > 0)
{
// Indicate end of the rows
pCscStatementContext->m_StreamingCursorInfo.m_endOfStreamingCursor = TRUE;
// Reset after one result read
conn->result = NULL;
}
}
/* First 'T' in a query sequence */
if (getRowDescriptions(conn))
return;
/*
* If we're doing a Describe, we're ready to pass the
* result back to the client.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
conn->asyncStatus = PGASYNC_READY;
if(pMessageLoopState && pCscExecutor)
{
if(pMessageLoopState->pgResult == NULL)
pMessageLoopState->pgResult = conn->result;
if((pMessageLoopState->pgResult != NULL
&& pMessageLoopState->pgResult->tuples != NULL)
|| pMessageLoopState->rowsInMemReturned)
{ // There was a resultset.
if(!(pMessageLoopState->rowsInMemReturned))
{
if(pMessageLoopState->pgResult)
pMessageLoopState->pgResult->m_cscResult = pCscResult;
if(pCscStatementContext->m_pResultHandlerCallbackFunc
&& pCscStatementContext->m_calledFromCscThread)
{
// Return the rows
(*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
pMessageLoopState->pgResult);
}
}
pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex + 1, NULL);
}
}
}
else
{
/*
* A new 'T' message is treated as the start of
* another PGresult. (It is not clear that this is
* really possible with the current backend.) We stop
* parsing until the application accepts the current
* result.
*/
conn->asyncStatus = PGASYNC_READY;
return;
}
break;
case 'n': /* No Data */
/*
* NoData indicates that we will not be seeing a
* RowDescription message because the statement or portal
* inquired about doesn't return rows.
*
* If we're doing a Describe, we have to pass something
* back to the client, so set up a COMMAND_OK result,
* instead of TUPLES_OK. Otherwise we can just ignore
* this message.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
{
if(iStreamingCursorMode)
{
resetAfterOneResultReadFromServerFinishForStreamingCursor(pCscStatementContext, conn);
}
if (conn->result == NULL)
{
conn->result = PQmakeEmptyPGresult(conn,
PGRES_COMMAND_OK);
if (!conn->result)
return;
}
if(pMessageLoopState && pCscExecutor)
{
if(pMessageLoopState->pgResult == NULL)
pMessageLoopState->pgResult = conn->result;
if(pCscStatementContext->m_calledFromCscThread)
{
// We need to make it null because, we already pass the result back to caller.
conn->result = NULL;
}
if((pMessageLoopState->pgResult != NULL
&& pMessageLoopState->pgResult->tuples != NULL)
|| pMessageLoopState->rowsInMemReturned)
{ // There was a resultset.
if(!(pMessageLoopState->rowsInMemReturned))
{
if(pMessageLoopState->pgResult)
pMessageLoopState->pgResult->m_cscResult = pCscResult;
if(pCscStatementContext->m_pResultHandlerCallbackFunc
&& pCscStatementContext->m_calledFromCscThread)
{
// Return the rows
(*pCscStatementContext->m_pResultHandlerCallbackFunc)(pCscStatementContext->m_pCallerContext,
pMessageLoopState->pgResult);
}
}
pMessageLoopState->executeIndex++;
pCscResult = resetAfterOneResultReadFromServerFinishCsc(pMessageLoopState, pCscExecutor, pMessageLoopState->executeIndex, NULL);
}
}
conn->asyncStatus = PGASYNC_READY;
}
break;
case 't': /* Parameter Description */
if (getParamDescriptions(conn))
return;
break;
case 'D': /* Data Row */
{
int skipRow = FALSE;
int createThread = FALSE;
PGresAttValue *tuple = NULL;
// Save result block status before reading row.
// We have to free and restore status after writing row to disk.
PGresAttValue *_curTuple = NULL;
PGresult_data *_curBlock = NULL; /* most recently allocated block */
PGresult_data *_nextBlock = NULL; /* most recently allocated block */
int _curOffset = 0; /* start offset of free space in block */
int _spaceLeft = 0; /* number of free bytes remaining in block */
int iSavedResultBlockStatus = FALSE; // Just indicate whether we save the status or not.
PGresult *result;
if (conn->result != NULL &&
conn->result->resultStatus == PGRES_TUPLES_OK)
{
if(pCscResult != NULL && pMessageLoopState)
{
skipRow = pMessageLoopState->m_skipAllResults
|| ((pMessageLoopState->m_skipResultNumber != 0)
&& (pMessageLoopState->m_skipResultNumber == (pMessageLoopState->executeIndex + 1)))
|| isSkipResultCsc(pCscResult)
|| doesMaxRowsReachCsc(pCscResult);
if(skipRow)
{
if(IS_TRACE_ON_CSC())
{
traceInfoCsc("skipRow is true. msgLoopState.m_skipAllResults = %d ((msgLoopState.m_skipResultNumber != 0) && (msgLoopState.m_skipResultNumber == (msgLoopState.executeIndex + 1))) = %d cscResult.isSkipResult() = %d cscResult.doesMaxRowsReach() = %d",
pMessageLoopState->m_skipAllResults,
((pMessageLoopState->m_skipResultNumber != 0) && (pMessageLoopState->m_skipResultNumber == (pMessageLoopState->executeIndex + 1))) ,
isSkipResultCsc(pCscResult),
doesMaxRowsReachCsc(pCscResult));
}
}
}
else
if(iStreamingCursorMode)
{
skipRow = pCscStatementContext->m_StreamingCursorInfo.m_skipStreamingCursor;
}
if (!skipRow)
{
if(pCscResult && doesThresholdReachCsc(pCscResult))
{
// Save the result block status
result = conn->result;
_curTuple = conn->curTuple;
_curBlock = result->curBlock;
_curOffset = result->curOffset;
_spaceLeft = result->spaceLeft;
_nextBlock = result->curBlock->next;
iSavedResultBlockStatus = TRUE;
}
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, msgLength,(pCscResult) ? &tuple : NULL, iStreamingCursorRows))
return;
}
else
{
/* Skip another tuple of a normal query response */
if (skipAnotherTuple(conn, msgLength))
return;
}
if (conn->output_nbytes && (conn->result->mem_used >= conn->output_nbytes))
{
go_return = 1;
break;
}
if(!skipRow)
{
if(pCscResult != NULL && pMessageLoopState)
{
// Add row count
incRowCountAndCheckForMaxRowsCsc(pCscResult);
setRawRowLengthCsc(pCscResult, msgLength);
if(!doesThresholdReachCsc(pCscResult))
{
int reachThresholdLimit = checkForThresholdLimitReachCsc(pCscResult,0, getRawRowLengthCsc(pCscResult), TRUE);
if(reachThresholdLimit)
{
// Store existing rows in tuples on disk
createFileCsc(pCscResult, conn->result);
// Now onward store rows on disk without doing calculation because threshold limit is reached.
setThresholdReachCsc(pCscResult, TRUE);
// Create new thread to read rest of rows from socket
createThread = pMessageLoopState->multiThreads;
}
}
}
if(pCscResult != NULL && doesThresholdReachCsc(pCscResult))
{
// Store current row on disk
writeRowCsc(pCscResult, tuple, conn->result->numAttributes);
// Free the tuple, by restoring block status before the row read.
if(iSavedResultBlockStatus)
{
PGresult_data *block;
iSavedResultBlockStatus = FALSE;
// Restore the result block status
result = conn->result;
conn->curTuple = _curTuple;
// Free any new block(s) added in front
while ((block = result->curBlock) != _curBlock)
{
if(block)
{
result->curBlock = block->next;
free(block);
}
else
break;
}
if(result->curBlock)
{
// Free any new block(s) added at next
while ((block = result->curBlock->next) != _nextBlock)
{
if(block)
{
result->curBlock->next = block->next;
free(block);
}
else
break;
}
}
// Restore cur block status
result->curOffset = _curOffset;
result->spaceLeft = _spaceLeft;
}
}
else
{
// Store current row in memory
if(tuple)
{
addAnotherTuple(conn, msgLength, tuple, 0);
}
else
{
// Is streaming cursor
if(iStreamingCursorMode)
{
if(conn->result
&& conn->result->totalntups == pCscStatementContext->m_StreamingCursorInfo.m_streamingCursorRows)
{
/* Normal case: parsing agrees with specified length */
conn->inStart = conn->inCursor;
// Return to break the loop
if(piStop)
*piStop = TRUE;
(pCscStatementContext->m_StreamingCursorInfo.m_streamResultBatchNumber)++;
return;
}
} // Streaming cursor
}
}
} // !skip
if(createThread)
{
int threadCreated;
int savInStart = conn->inStart;
pCscStatementContext->m_cscThreadCreated = TRUE;
/* Normal case: parsing agrees with specified length */
conn->inStart = conn->inCursor;
threadCreated = createProcessingThreadCsc(pCscExecutor, pMessageLoopState, pCscResult,
conn->result, pCscStatementContext);
if(threadCreated)
{
// Return to break the loop
if(piStop)
*piStop = TRUE;
return;
}
else
{
conn->inStart = savInStart;
pCscStatementContext->m_cscThreadCreated = FALSE;
}
} // Create thread
}
else if (conn->result != NULL &&
conn->result->resultStatus == PGRES_FATAL_ERROR)
{
/*
* We've already choked for some reason. Just discard
* tuples till we get to the end of the query.
*/
conn->inCursor += msgLength;
}
else
{
/* Set up to report error at end of query */
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("server sent data (\"D\" message) without prior row description (\"T\" message)\n"));
pqSaveErrorResult(conn);
/* Discard the unexpected message */
conn->inCursor += msgLength;
}
break;
}
case 'G': /* Start Copy In */
if (getCopyStart(conn, PGRES_COPY_IN))
return;
conn->asyncStatus = PGASYNC_COPY_IN;
break;
case 'H': /* Start Copy Out */
if (getCopyStart(conn, PGRES_COPY_OUT))
return;
conn->asyncStatus = PGASYNC_COPY_OUT;
conn->copy_already_done = 0;
break;
case 'W': /* Start Copy Both */
if (getCopyStart(conn, PGRES_COPY_BOTH))
return;
conn->asyncStatus = PGASYNC_COPY_BOTH;
conn->copy_already_done = 0;
break;
case 'd': /* Copy Data */
/*
* If we see Copy Data, just silently drop it. This would
* only occur if application exits COPY OUT mode too
* early.
*/
conn->inCursor += msgLength;
break;
case 'c': /* Copy Done */
/*
* If we see Copy Done, just silently drop it. This is
* the normal case during PQendcopy. We will keep
* swallowing data, expecting to see command-complete for
* the COPY command.
*/
break;
default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
"unexpected response from server; first received character was \"%c\"\n"),
id);
/* build an error result holding the error message */
pqSaveErrorResult(conn);
/* not sure if we will see more, so go to ready state */
conn->asyncStatus = PGASYNC_READY;
/* Discard the unexpected message */
conn->inCursor += msgLength;
break;
} /* switch on protocol character */
}
/* Successfully consumed this message */
if (conn->inCursor == conn->inStart + 5 + msgLength)
{
/* Normal case: parsing agrees with specified length */
conn->inStart = conn->inCursor;
}
else
{
/* Trouble --- report it */
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("message contents do not agree with length in message type \"%c\"\n"),
id);
/* build an error result holding the error message */
pqSaveErrorResult(conn);
conn->asyncStatus = PGASYNC_READY;
/* trust the specified message length as what to skip */
conn->inStart += 5 + msgLength;
}
if (go_return)
return;
} // Message loop
}