in src/odbc/rsodbc/rslibpq.c [807:1340]
SQLRETURN libpqExecuteDirectOrPreparedOnThread(RS_STMT_INFO *pStmt, char *pszCmd, int executePrepared, int iCheckForRefCursor, int iLockRequired)
{
SQLRETURN rc = SQL_SUCCESS;
RS_CONN_INFO *pConn = pStmt->phdbc;
int iNumBindParams = 0;
char **ppBindParamVals = NULL;
int *piParamFormats = NULL;
RS_BIND_PARAM_STR_BUF *pBindParamStrBuf = NULL;
int iBindParam;
int iBeginCommand = FALSE;
int iCscThreadCreated = FALSE;
int iLastBatchMultiInsertPrepare = FALSE;
std::vector<Oid> paramTypes;
// Use for legacy functions that need pointer and need to indicate null as
// empty
auto getParamTypesPtr = [&]() -> const Oid * {
return paramTypes.empty() ? nullptr : paramTypes.data();
};
if(iLockRequired)
{
// Lock connection sem to protect multiple stmt execution at same time.
rsLockSem(pConn->hSemMultiStmt);
// Wait for current csc thread to finish, if any.
pgWaitForCscThreadToFinish(pConn->pgConn, FALSE);
}
if((pszCmd && !executePrepared) || executePrepared)
{
PGresult *pgResult = NULL;
ExecStatusType pqRc = PGRES_COMMAND_OK;
int asyncEnable = isAsyncEnable(pStmt);
int sendStatus = 1;
// Set query timeout in the server
rc = setQueryTimeoutInServer(pStmt);
if(rc == SQL_SUCCESS)
{
RS_DESC_HEADER &pIPDDescHeader = pStmt->pIPD->pDescHeader;
RS_DESC_HEADER &pAPDDescHeader = pStmt->pStmtAttr->pAPD->pDescHeader;
// Bind array/single value
long lParamsToBind = (pAPDDescHeader.lArraySize <= 0) ? 1 : pAPDDescHeader.lArraySize;
long lParamProcessed = 0;
int iArrayBinding = (lParamsToBind > 1);
SQLLEN iBindOffset = (pAPDDescHeader.plBindOffsetPtr) ? *(pAPDDescHeader.plBindOffsetPtr) : 0;
int iMultiInsert = pStmt->iMultiInsert;
int iLastBatchMultiInsert = pStmt->iLastBatchMultiInsert;
int iOffset = 0;
for(lParamProcessed = 0; lParamProcessed < lParamsToBind; lParamProcessed++)
{
if(pStmt->pStmtAttr->pAPD->pDescRecHead)
{
// Look for param at exec
if(!iMultiInsert && needDataAtExec(pStmt, pStmt->pStmtAttr->pAPD->pDescRecHead, lParamProcessed, executePrepared))
{
// Store arguments value for data-at-exec
pStmt->pszCmdDataAtExec = pszCmd;
pStmt->iExecutePreparedDataAtExec = executePrepared;
pStmt->lParamProcessedDataAtExec = lParamProcessed;
rc = SQL_NEED_DATA;
}
else
{
RS_DESC_REC *pDescRec;
RS_DESC_REC *pIPDRec;
int iConversionError;
int iNoOfParams = (pStmt->pPrepareHead) ? getNumberOfParams(pStmt) : getParamMarkerCount(pStmt);
iNumBindParams = countBindParams(pStmt->pStmtAttr->pAPD->pDescRecHead);
if(!executePrepared && (pStmt->iNumOfOutOnlyParams > 0
|| (!iArrayBinding && iNumBindParams > 0)))
paramTypes = getParamTypes(iNumBindParams, pStmt->pStmtAttr->pAPD->pDescRecHead, pConn->pConnectProps);
// If user bind more than actually in query, we ignore unused.
if(iNoOfParams < iNumBindParams)
iNumBindParams = iNoOfParams;
if(!iMultiInsert)
{
if(iNumBindParams > 0)
{
ppBindParamVals = (char **)rs_calloc(iNumBindParams, sizeof(char *));
piParamFormats = (int *)rs_calloc(iNumBindParams, sizeof(int));
pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_calloc(iNumBindParams, sizeof(RS_BIND_PARAM_STR_BUF));
if(ppBindParamVals == NULL
|| piParamFormats == NULL
|| pBindParamStrBuf == NULL)
{
rc = SQL_ERROR;
goto error;
}
}
}
else
{
if((iNumBindParams > 0) && (ppBindParamVals == NULL))
{
ppBindParamVals = (char **)rs_calloc(iNumBindParams * iMultiInsert, sizeof(char *));
piParamFormats = (int *)rs_calloc(iNumBindParams * iMultiInsert, sizeof(int));
pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_calloc(iNumBindParams * iMultiInsert, sizeof(RS_BIND_PARAM_STR_BUF));
if(ppBindParamVals == NULL
|| piParamFormats == NULL
|| pBindParamStrBuf == NULL)
{
rc = SQL_ERROR;
goto error;
}
}
}
for(iBindParam = 0; iBindParam < iNumBindParams; iBindParam++)
{
int iValOffset = 0;
pDescRec = findDescRec(pStmt->pStmtAttr->pAPD, iBindParam + 1);
pIPDRec = findDescRec(pStmt->pIPD, iBindParam + 1);
if(!pDescRec)
{
rc = SQL_ERROR;
addError(&pStmt->pErrorList,"HY000", "Bind parameter not found.", 0, NULL);
goto error;
}
if(iArrayBinding)
{
if(pAPDDescHeader.lBindType == SQL_BIND_BY_COLUMN)
{
// Column wise binding
iValOffset = pDescRec->iOctetLen;
if(!iValOffset)
{
rc = SQL_ERROR;
addError(&pStmt->pErrorList,"HY000", "Array element length is zero.", 0, NULL);
goto error;
}
}
else
{
// Row wise binding
iValOffset = pAPDDescHeader.lBindType;
}
}
iOffset = (iMultiInsert) ? iOffset : iBindParam;
if (pDescRec->hInOutType == SQL_PARAM_OUTPUT)
{
ppBindParamVals[iOffset] = "null";
}
else
{
char *pParamData = NULL;
if (pDescRec->pDataAtExec) {
pParamData = pDescRec->pDataAtExec->pValue;
} else {
if (pDescRec->pValue) {
pParamData = (char *)pDescRec->pValue + (lParamProcessed * iValOffset) + iBindOffset;
} else {
pParamData = NULL;
}
}
int iParamDataLen = 0;
if (pDescRec->pDataAtExec) {
iParamDataLen = pDescRec->pDataAtExec->cbLen;
} else {//pcbLenInd
iParamDataLen = pDescRec->cbLen;
}
SQLLEN *plParamDataStrLenInd = NULL;
if (pDescRec->pDataAtExec) {
plParamDataStrLenInd = (SQLLEN *)(void *)(&(
pDescRec->pDataAtExec->cbLen));
} else {
if (pDescRec->pcbLenInd) {
plParamDataStrLenInd =
(SQLLEN *)(void *)(pDescRec
->pcbLenInd +
lParamProcessed);
} else {
plParamDataStrLenInd = NULL;
}
}
short hPrepSQLType;
if (pIPDRec && pIPDRec->hType != 0) {
hPrepSQLType = pIPDRec->hType;
} else {
hPrepSQLType = pDescRec->hParamSQLType;
}
ppBindParamVals[iOffset] = convertCParamDataToSQLData(
pStmt,
pParamData,
iParamDataLen,
plParamDataStrLenInd,
pDescRec->hType,
pDescRec->hParamSQLType,
hPrepSQLType,
&(pBindParamStrBuf[iOffset]),
&iConversionError);
if (iConversionError)
{
rc = SQL_ERROR;
goto error;
}
}
piParamFormats[iOffset] = RS_TEXT_FORMAT;
iOffset++;
} // Bind param loop
// Put the param processed count
if(pIPDDescHeader.valid)
{
// Param processed count
if(pIPDDescHeader.plRowsProcessedPtr)
*(pIPDDescHeader.plRowsProcessedPtr) = lParamProcessed + 1;
// Param status
if(pIPDDescHeader.phArrayStatusPtr)
{
short hParamStatus;
if(rc == SQL_SUCCESS)
hParamStatus = SQL_PARAM_SUCCESS;
else
if(rc == SQL_SUCCESS_WITH_INFO)
hParamStatus = SQL_PARAM_SUCCESS_WITH_INFO;
else
if(rc == SQL_ERROR)
hParamStatus = SQL_PARAM_ERROR;
else
hParamStatus = SQL_PARAM_ERROR;
*(pIPDDescHeader.phArrayStatusPtr + lParamProcessed) = hParamStatus;
}
}
} /* !Data_At_Exec */
}
if(!iMultiInsert || (iMultiInsert && ((((lParamProcessed + 1) % iMultiInsert) == 0)
|| (iLastBatchMultiInsert && (lParamProcessed + 1 == lParamsToBind))
)
)
)
{
// Execute it using libpq
if(rc != SQL_NEED_DATA)
{
int iStopFlag = FALSE;
int nParams = 0;
int iReadOutParamVals = pStmt->iFunctionCall;
if(iMultiInsert)
{
if(iLastBatchMultiInsert && (lParamProcessed + 1 == lParamsToBind))
{
nParams = (iNumBindParams * iLastBatchMultiInsert);
if(!executePrepared)
pszCmd = pStmt->pszLastBatchMultiInsertCmd->pBuf;
else
{
iLastBatchMultiInsertPrepare = TRUE;
// De-allocate previous multi-insert command and prepare last batch multi-insret command
rc = rePrepareMultiInsertCommand(pStmt, pStmt->pszLastBatchMultiInsertCmd->pBuf);
if(rc == SQL_ERROR)
goto error;
}
}
else
nParams = (iNumBindParams * iMultiInsert);
}
else
nParams = iNumBindParams;
// Look for whether to execute BEGIN or not
if((pConn->pConnAttr->iAutoCommit == SQL_AUTOCOMMIT_OFF || pStmt->iFunctionCall == TRUE)
&& libpqIsTransactionIdle(pConn) && (lParamProcessed == 0 || iMultiInsert))
{
iBeginCommand = TRUE;
rc = libpqExecuteTransactionCommand(pConn, BEGIN_CMD, FALSE);
if(rc == SQL_ERROR)
goto error;
}
if(asyncEnable)
{
sendStatus = (!executePrepared) ? ( (iNumBindParams) ? PQsendQueryParams( pConn->pgConn, pszCmd, nParams, getParamTypesPtr(),(const char *const * )ppBindParamVals,NULL, piParamFormats, RS_TEXT_FORMAT)
: PQsendQuery(pConn->pgConn, pszCmd) )
: PQsendQueryPrepared(pConn->pgConn, pStmt->szCursorName, nParams, (const char *const * )ppBindParamVals, NULL, piParamFormats, RS_TEXT_FORMAT);
if(sendStatus)
{
pgResult = pqGetResult(pConn->pgConn, pStmt->pCscStatementContext);
pqRc = PQresultStatus(pgResult);
}
else
pqRc = PGRES_FATAL_ERROR;
} else {
if (!executePrepared) {
if (iNumBindParams) {
pgResult = pqexecParams(
pConn->pgConn, pszCmd, nParams, getParamTypesPtr(),
(const char *const *)ppBindParamVals, NULL,
piParamFormats, RS_TEXT_FORMAT,
pStmt->pCscStatementContext);
} else {
pgResult = pqExec(pConn->pgConn, pszCmd,
pStmt->pCscStatementContext);
}
} else {
pgResult = pqExecPrepared(
pConn->pgConn, pStmt->szCursorName, nParams,
(const char *const *)ppBindParamVals, NULL,
piParamFormats, RS_TEXT_FORMAT,
pStmt->pCscStatementContext);
}
pqRc = PQresultStatus(pgResult);
}
// Multi result loop
do
{
// Even one result in error, we are retuning error.
rc = setResultInStmt(rc, pStmt, pgResult, FALSE, pqRc,&iStopFlag, iArrayBinding);
if(iStopFlag)
break;
if((isStreamingCursorMode(pStmt))
&& (rc != SQL_ERROR)
)
{
iReadOutParamVals = FALSE;
break; // We are not looping for multiple result right now
}
if(!getCscThreadCreatedFlag(pStmt->pCscStatementContext))
{
// Loop for next result
if(asyncEnable)
{
// If command not send, no need to check for next result.
if(!sendStatus)
break;
// Get next result
pgResult = pqGetResult(pConn->pgConn, pStmt->pCscStatementContext);
if(!pgResult)
break;
// Get result status
pqRc = PQresultStatus(pgResult);
}
else
{
// Get next result
pgResult = pqGetResult(pConn->pgConn, pStmt->pCscStatementContext);
if(!pgResult)
break;
// Get result status
pqRc = PQresultStatus(pgResult);
}
}
else
{
// CSC processing will happen on a thread.
iCscThreadCreated = TRUE;
// Reset the flag as we are breaking the loop
setCscThreadCreatedFlag(pStmt->pCscStatementContext, FALSE);
iReadOutParamVals = FALSE;
break;
}
}while(TRUE); // Results loop
if (rc == SQL_SUCCESS)
{
// Put the OUT parameter values, if any
if (iReadOutParamVals
&& (pStmt->iNumOfOutOnlyParams > 0
|| pStmt->iNumOfInOutOnlyParams > 0))
{
int rc1 = updateOutBindParametersValue(pStmt);
if (rc1 == SQL_ERROR)
{
rc = rc1;
addError(&pStmt->pErrorList, "HY000", "OUT parameter processing issue", 0, NULL);
goto error;
}
}
}
} // !SQL_NEED_DATA
}
// Clean param buffers
if((iNumBindParams > 0) && (!iMultiInsert || ( iMultiInsert && ((((lParamProcessed + 1) % iMultiInsert) == 0)
|| (iLastBatchMultiInsert && (lParamProcessed + 1 == lParamsToBind))
)
)
)
)
{
ppBindParamVals = (char **)rs_free(ppBindParamVals);
piParamFormats = (int *)rs_free(piParamFormats);
if(pBindParamStrBuf)
{
for(iBindParam = 0; iBindParam < iNumBindParams; iBindParam++)
{
if(pBindParamStrBuf[iBindParam].iAllocDataLen > 0)
{
pBindParamStrBuf[iBindParam].pBuf = (char *)rs_free(pBindParamStrBuf[iBindParam].pBuf);
pBindParamStrBuf[iBindParam].iAllocDataLen = 0;
}
}
pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_free(pBindParamStrBuf);
}
paramTypes.clear();
iOffset = 0;
}
} // Array binding loop
if(iLastBatchMultiInsertPrepare)
{
iLastBatchMultiInsertPrepare = FALSE;
// De-allocate previous multi-insert command and prepare regular multi-insret command
rc = rePrepareMultiInsertCommand(pStmt, pStmt->pCmdBuf->pBuf);
if(rc == SQL_ERROR)
goto error;
}
} // SQL_SUCCESS
}
else
{
rc = SQL_ERROR;
addError(&pStmt->pErrorList,"HY000", "Invalid command buffer", 0, NULL);
goto error;
}
// This will do recursion call, without lock
if(iCheckForRefCursor && rc == SQL_SUCCESS && pConn->pConnectProps->iFetchRefCursor)
{
// TODO: We have to do RefCursor processing after CSC thread is done or do it for one result at a time.
if(!iCscThreadCreated)
{
// Check for refcursor in result list, if exist execute fetch all and replce result node in-place.
// Note that this may call this function again, so it become recursion.
rc = checkAndAutoFetchRefCursor(pStmt);
if(rc == SQL_ERROR)
goto error;
}
}
if(pStmt->pCopyCmd)
{
// Was it COPY command with STDIN or CLIENT?
rc = checkAndHandleCopyStdinOrClient(pStmt,rc,FALSE);
}
else
if(pStmt->pUnloadCmd)
{
// Was it UNLOAD command with CLIENT?
rc = checkAndHandleCopyOutClient(pStmt,rc);
}
if(iBeginCommand)
{
if(pConn->pConnAttr->iAutoCommit != SQL_AUTOCOMMIT_OFF
&& pStmt->iFunctionCall == TRUE
&& !libpqIsTransactionIdle(pConn))
{
// Send commit/rollback after function call, if auto commit is ON.
libpqExecuteTransactionCommand(pConn, (char *)((rc != SQL_ERROR) ? COMMIT_CMD : ROLLBACK_CMD), FALSE);
}
}
if(iLockRequired)
{
// Unlock connection sem
rsUnlockSem(pConn->hSemMultiStmt);
}
return rc;
error:
// Clean param buffers
if(iNumBindParams > 0)
{
ppBindParamVals = (char **)rs_free(ppBindParamVals);
piParamFormats = (int *)rs_free(piParamFormats);
if(pBindParamStrBuf)
{
for(iBindParam = 0; iBindParam < iNumBindParams; iBindParam++)
{
if(pBindParamStrBuf[iBindParam].iAllocDataLen > 0)
{
pBindParamStrBuf[iBindParam].pBuf = (char *)rs_free(pBindParamStrBuf[iBindParam].pBuf);
pBindParamStrBuf[iBindParam].iAllocDataLen = 0;
}
}
pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_free(pBindParamStrBuf);
}
}
if(iLockRequired)
{
// Unlock connection sem
rsUnlockSem(pConn->hSemMultiStmt);
}
return rc;
}