SQLRETURN libpqExecuteDirectOrPreparedOnThread()

in src/odbc/rsodbc/rslibpq.c [807:1340]


SQLRETURN libpqExecuteDirectOrPreparedOnThread(RS_STMT_INFO *pStmt, char *pszCmd, int executePrepared, int iCheckForRefCursor, int iLockRequired)
{
    SQLRETURN rc = SQL_SUCCESS;
    RS_CONN_INFO *pConn = pStmt->phdbc;
    int  iNumBindParams = 0;
    char **ppBindParamVals = NULL;
    int *piParamFormats = NULL;
    RS_BIND_PARAM_STR_BUF *pBindParamStrBuf = NULL;
    int iBindParam;
    int iBeginCommand = FALSE;
    int iCscThreadCreated = FALSE;
    int iLastBatchMultiInsertPrepare = FALSE;
    std::vector<Oid> paramTypes;
    // Use for legacy functions that need pointer and need to indicate null as
    // empty
    auto getParamTypesPtr = [&]() -> const Oid * {
        return paramTypes.empty() ? nullptr : paramTypes.data();
    };

    if(iLockRequired)
    {
        // Lock connection sem to protect multiple stmt execution at same time.
        rsLockSem(pConn->hSemMultiStmt);

        // Wait for current csc thread to finish, if any.
        pgWaitForCscThreadToFinish(pConn->pgConn, FALSE);
    }

    if((pszCmd && !executePrepared) || executePrepared)
    {
        PGresult *pgResult = NULL;
        ExecStatusType pqRc = PGRES_COMMAND_OK;
        int asyncEnable = isAsyncEnable(pStmt);
        int sendStatus = 1;

        // Set query timeout in the server
        rc = setQueryTimeoutInServer(pStmt);

        if(rc == SQL_SUCCESS)
        {
            RS_DESC_HEADER &pIPDDescHeader = pStmt->pIPD->pDescHeader;
            RS_DESC_HEADER &pAPDDescHeader = pStmt->pStmtAttr->pAPD->pDescHeader;

            // Bind array/single value
            long lParamsToBind = (pAPDDescHeader.lArraySize <= 0) ? 1 : pAPDDescHeader.lArraySize;
            long lParamProcessed = 0;
            int  iArrayBinding = (lParamsToBind > 1);
            SQLLEN  iBindOffset = (pAPDDescHeader.plBindOffsetPtr) ? *(pAPDDescHeader.plBindOffsetPtr) : 0;
            int  iMultiInsert = pStmt->iMultiInsert;
            int  iLastBatchMultiInsert = pStmt->iLastBatchMultiInsert;
            int  iOffset = 0;

            for(lParamProcessed = 0; lParamProcessed < lParamsToBind; lParamProcessed++)
            {
                if(pStmt->pStmtAttr->pAPD->pDescRecHead)
                {
                    // Look for param at exec
                    if(!iMultiInsert && needDataAtExec(pStmt, pStmt->pStmtAttr->pAPD->pDescRecHead, lParamProcessed, executePrepared))
                    {
                        // Store arguments value for data-at-exec
                        pStmt->pszCmdDataAtExec = pszCmd;
                        pStmt->iExecutePreparedDataAtExec = executePrepared;
                        pStmt->lParamProcessedDataAtExec = lParamProcessed;
                        rc = SQL_NEED_DATA;
                    }
                    else
                    {
                        RS_DESC_REC *pDescRec;
                        RS_DESC_REC *pIPDRec;
                        int iConversionError;
                        int iNoOfParams = (pStmt->pPrepareHead) ? getNumberOfParams(pStmt) : getParamMarkerCount(pStmt);

                        iNumBindParams = countBindParams(pStmt->pStmtAttr->pAPD->pDescRecHead);

						if(!executePrepared && (pStmt->iNumOfOutOnlyParams > 0
													|| (!iArrayBinding && iNumBindParams > 0)))
							paramTypes = getParamTypes(iNumBindParams, pStmt->pStmtAttr->pAPD->pDescRecHead, pConn->pConnectProps);

                        // If user bind more than actually in query, we ignore unused.
                        if(iNoOfParams < iNumBindParams)
                            iNumBindParams = iNoOfParams;

                        if(!iMultiInsert)
                        {
                            if(iNumBindParams > 0)
                            {
                                ppBindParamVals = (char **)rs_calloc(iNumBindParams, sizeof(char *));
                                piParamFormats  = (int *)rs_calloc(iNumBindParams, sizeof(int));
                                pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_calloc(iNumBindParams, sizeof(RS_BIND_PARAM_STR_BUF));

                                if(ppBindParamVals == NULL 
                                    || piParamFormats == NULL
                                    || pBindParamStrBuf == NULL)
                                {
                                    rc = SQL_ERROR;
                                    goto error;
                                }
                            }
                        }
                        else
                        {
                            if((iNumBindParams > 0) && (ppBindParamVals == NULL))
                            {
                                ppBindParamVals = (char **)rs_calloc(iNumBindParams * iMultiInsert, sizeof(char *));
                                piParamFormats  = (int *)rs_calloc(iNumBindParams * iMultiInsert, sizeof(int));
                                pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_calloc(iNumBindParams * iMultiInsert, sizeof(RS_BIND_PARAM_STR_BUF));

                                if(ppBindParamVals == NULL 
                                    || piParamFormats == NULL
                                    || pBindParamStrBuf == NULL)
                                {
                                    rc = SQL_ERROR;
                                    goto error;
                                }
                            }
                        }

                        for(iBindParam = 0; iBindParam < iNumBindParams; iBindParam++)
                        {
                            int iValOffset = 0;

                            pDescRec = findDescRec(pStmt->pStmtAttr->pAPD, iBindParam + 1);
                            pIPDRec  = findDescRec(pStmt->pIPD, iBindParam + 1);

                            if(!pDescRec)
                            {
                                rc = SQL_ERROR;
                                addError(&pStmt->pErrorList,"HY000", "Bind parameter not found.", 0, NULL);
                                goto error;
                            }

                            if(iArrayBinding)
                            {
                                if(pAPDDescHeader.lBindType == SQL_BIND_BY_COLUMN)
                                {
                                    // Column wise binding
                                    iValOffset = pDescRec->iOctetLen;

                                    if(!iValOffset)
                                    {
                                        rc = SQL_ERROR;
                                        addError(&pStmt->pErrorList,"HY000", "Array element length is zero.", 0, NULL);
                                        goto error;
                                    }
                                }
                                else
                                {
                                    // Row wise binding
                                    iValOffset = pAPDDescHeader.lBindType;
                                }
                            }

                            iOffset = (iMultiInsert) ? iOffset : iBindParam;

                            if (pDescRec->hInOutType == SQL_PARAM_OUTPUT)
                            {
                                ppBindParamVals[iOffset] = "null";
                            }
                            else
                            {
                                char *pParamData = NULL;
                                if (pDescRec->pDataAtExec) {
                                    pParamData = pDescRec->pDataAtExec->pValue;
                                } else {
                                    if (pDescRec->pValue) {
                                        pParamData = (char *)pDescRec->pValue + (lParamProcessed * iValOffset) + iBindOffset;
                                    } else {
                                        pParamData = NULL;
                                    }
                                }
                                int iParamDataLen = 0;
                                if (pDescRec->pDataAtExec) {
                                    iParamDataLen = pDescRec->pDataAtExec->cbLen;
                                } else {//pcbLenInd
                                    iParamDataLen = pDescRec->cbLen;
                                }
                                SQLLEN *plParamDataStrLenInd = NULL;
                                if (pDescRec->pDataAtExec) {
                                    plParamDataStrLenInd = (SQLLEN *)(void *)(&(
                                        pDescRec->pDataAtExec->cbLen));
                                } else {
                                    if (pDescRec->pcbLenInd) {
                                        plParamDataStrLenInd =
                                            (SQLLEN *)(void *)(pDescRec
                                                                   ->pcbLenInd +
                                                               lParamProcessed);
                                    } else {

                                        plParamDataStrLenInd = NULL;
                                    }
                                }
                                short hPrepSQLType;
                                if (pIPDRec && pIPDRec->hType != 0) {
                                    hPrepSQLType = pIPDRec->hType;
                                } else {
                                    hPrepSQLType = pDescRec->hParamSQLType;
                                }
                                ppBindParamVals[iOffset] = convertCParamDataToSQLData(
                                    pStmt,
                                    pParamData,
                                    iParamDataLen,
                                    plParamDataStrLenInd,
                                    pDescRec->hType,
                                    pDescRec->hParamSQLType,
                                    hPrepSQLType,
                                    &(pBindParamStrBuf[iOffset]), 
                                    &iConversionError);
                                if (iConversionError)
                                {
                                    rc = SQL_ERROR;
                                    goto error;
                                }
                            }

                            piParamFormats[iOffset] = RS_TEXT_FORMAT;

                            iOffset++;

                        } // Bind param loop

                        // Put the param processed count
                        if(pIPDDescHeader.valid)
                        {
                            // Param processed count
                            if(pIPDDescHeader.plRowsProcessedPtr)
                                *(pIPDDescHeader.plRowsProcessedPtr) = lParamProcessed + 1;

                            // Param status
                            if(pIPDDescHeader.phArrayStatusPtr)
                            {
                                short hParamStatus;

                                if(rc == SQL_SUCCESS)
                                    hParamStatus = SQL_PARAM_SUCCESS;
                                else
                                if(rc == SQL_SUCCESS_WITH_INFO)
                                    hParamStatus = SQL_PARAM_SUCCESS_WITH_INFO;
                                else
                                if(rc == SQL_ERROR)
                                    hParamStatus = SQL_PARAM_ERROR;
                                else
                                    hParamStatus = SQL_PARAM_ERROR;

                                *(pIPDDescHeader.phArrayStatusPtr + lParamProcessed) = hParamStatus;
                            }

                    }

                    } /* !Data_At_Exec */
                }

                if(!iMultiInsert || (iMultiInsert && ((((lParamProcessed + 1) % iMultiInsert) == 0) 
                                                            || (iLastBatchMultiInsert && (lParamProcessed + 1 == lParamsToBind))
                                                      )
                                    )
                  )
                {
                    // Execute it using libpq
                    if(rc != SQL_NEED_DATA)
                    {
                        int iStopFlag = FALSE;
                        int nParams = 0;
						int iReadOutParamVals = pStmt->iFunctionCall;
                       
                        if(iMultiInsert)
                        {
                            if(iLastBatchMultiInsert && (lParamProcessed + 1 == lParamsToBind))
                            {
                                nParams = (iNumBindParams * iLastBatchMultiInsert);

                                if(!executePrepared)
                                    pszCmd = pStmt->pszLastBatchMultiInsertCmd->pBuf;
                                else
                                {
                                    iLastBatchMultiInsertPrepare = TRUE;
                                    // De-allocate previous multi-insert command and prepare last batch multi-insret command
                                    rc = rePrepareMultiInsertCommand(pStmt, pStmt->pszLastBatchMultiInsertCmd->pBuf);
                                    if(rc == SQL_ERROR)
                                         goto error;
                                }
                            }
                            else
                                nParams = (iNumBindParams * iMultiInsert);
                        }
                        else
                            nParams = iNumBindParams;

                        // Look for whether to execute BEGIN or not
                        if((pConn->pConnAttr->iAutoCommit == SQL_AUTOCOMMIT_OFF || pStmt->iFunctionCall == TRUE)
                            && libpqIsTransactionIdle(pConn) && (lParamProcessed == 0 || iMultiInsert))
                        {
                            iBeginCommand = TRUE;
                            rc = libpqExecuteTransactionCommand(pConn, BEGIN_CMD, FALSE);
                            if(rc == SQL_ERROR)
                                goto error;
                        }

                        if(asyncEnable)
                        {
                            sendStatus = (!executePrepared) ? ( (iNumBindParams) ? PQsendQueryParams( pConn->pgConn, pszCmd, nParams, getParamTypesPtr(),(const char *const * )ppBindParamVals,NULL, piParamFormats, RS_TEXT_FORMAT)
                                                                                  : PQsendQuery(pConn->pgConn, pszCmd) )
                                                            : PQsendQueryPrepared(pConn->pgConn, pStmt->szCursorName, nParams, (const char *const * )ppBindParamVals, NULL, piParamFormats, RS_TEXT_FORMAT);

                            if(sendStatus)
                            {
                                pgResult = pqGetResult(pConn->pgConn, pStmt->pCscStatementContext);
                                pqRc = PQresultStatus(pgResult);
                            }
                            else
                                pqRc = PGRES_FATAL_ERROR;
                        } else {
                          if (!executePrepared) {
                            if (iNumBindParams) {
                              pgResult = pqexecParams(
                                  pConn->pgConn, pszCmd, nParams, getParamTypesPtr(),
                                  (const char *const *)ppBindParamVals, NULL,
                                  piParamFormats, RS_TEXT_FORMAT,
                                  pStmt->pCscStatementContext);
                            } else {
                              pgResult = pqExec(pConn->pgConn, pszCmd,
                                                pStmt->pCscStatementContext);
                            }
                          } else {
                            pgResult = pqExecPrepared(
                                pConn->pgConn, pStmt->szCursorName, nParams,
                                (const char *const *)ppBindParamVals, NULL,
                                piParamFormats, RS_TEXT_FORMAT,
                                pStmt->pCscStatementContext);
                          }
                          pqRc = PQresultStatus(pgResult);
                        }

                        // Multi result loop
                        do
                        {
                            // Even one result in error, we are retuning error.
                            rc = setResultInStmt(rc, pStmt, pgResult, FALSE, pqRc,&iStopFlag, iArrayBinding);
                            if(iStopFlag)
                                break;

							if((isStreamingCursorMode(pStmt))
								&& (rc != SQL_ERROR)
							)
							{
								iReadOutParamVals = FALSE;
								break; // We are not looping for multiple result right now
							}

                            if(!getCscThreadCreatedFlag(pStmt->pCscStatementContext))
                            {
                                // Loop for next result
                                if(asyncEnable)
                                {
                                    // If command not send, no need to check for next result.
                                    if(!sendStatus)
                                        break;

                                    // Get next result
                                    pgResult = pqGetResult(pConn->pgConn, pStmt->pCscStatementContext);
                                    if(!pgResult)
                                        break;

                                    // Get result status
                                    pqRc = PQresultStatus(pgResult);
                                }
                                else
                                {
                                    // Get next result
                                    pgResult = pqGetResult(pConn->pgConn, pStmt->pCscStatementContext);
                                    if(!pgResult)
                                        break;

                                    // Get result status
                                    pqRc = PQresultStatus(pgResult);
                                }
                            }
                            else
                            {
                                // CSC processing will happen on a thread.
                                iCscThreadCreated = TRUE;

                                // Reset the flag as we are breaking the loop
                                setCscThreadCreatedFlag(pStmt->pCscStatementContext, FALSE);

								iReadOutParamVals = FALSE;

                                break;
                            }
                        }while(TRUE); // Results  loop

						if (rc == SQL_SUCCESS)
						{
							// Put the OUT parameter values, if any
							if (iReadOutParamVals
								&& (pStmt->iNumOfOutOnlyParams > 0
									|| pStmt->iNumOfInOutOnlyParams > 0))
							{
								int rc1 = updateOutBindParametersValue(pStmt);
								if (rc1 == SQL_ERROR)
								{
									rc = rc1;
									addError(&pStmt->pErrorList, "HY000", "OUT parameter processing issue", 0, NULL);
									goto error;
								}
							}
						}
                    } // !SQL_NEED_DATA
                } 

                // Clean param buffers
                if((iNumBindParams > 0) && (!iMultiInsert || ( iMultiInsert && ((((lParamProcessed + 1) % iMultiInsert) == 0)
                                                                                    || (iLastBatchMultiInsert && (lParamProcessed + 1 == lParamsToBind))
                                                                               )
                                                             )
                                           )
                  )
                {
                    ppBindParamVals = (char **)rs_free(ppBindParamVals);
                    piParamFormats  = (int *)rs_free(piParamFormats);

                    if(pBindParamStrBuf)
                    {
                        for(iBindParam = 0; iBindParam < iNumBindParams; iBindParam++)
                        {
                            if(pBindParamStrBuf[iBindParam].iAllocDataLen > 0)
                            {
                                pBindParamStrBuf[iBindParam].pBuf = (char *)rs_free(pBindParamStrBuf[iBindParam].pBuf);
                                pBindParamStrBuf[iBindParam].iAllocDataLen = 0;
                            }
                        }

                        pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_free(pBindParamStrBuf);
                    }

					paramTypes.clear();

                    iOffset = 0;
                }
            } // Array binding loop

            if(iLastBatchMultiInsertPrepare)
            {
                iLastBatchMultiInsertPrepare = FALSE;
                // De-allocate previous multi-insert command and prepare regular multi-insret command
                rc = rePrepareMultiInsertCommand(pStmt, pStmt->pCmdBuf->pBuf);
                if(rc == SQL_ERROR)
                     goto error;
            }
        } // SQL_SUCCESS
    }
    else
    {
        rc = SQL_ERROR;
        addError(&pStmt->pErrorList,"HY000", "Invalid command buffer", 0, NULL);
        goto error;
    }

    // This will do recursion call, without lock
    if(iCheckForRefCursor && rc == SQL_SUCCESS && pConn->pConnectProps->iFetchRefCursor)
    {
        // TODO: We have to do RefCursor processing after CSC thread is done or do it for one result at a time.
        if(!iCscThreadCreated)
        {
            // Check for refcursor in result list, if exist execute fetch all and replce result node in-place.
            // Note that this may call this function again, so it become recursion.
            rc = checkAndAutoFetchRefCursor(pStmt);
            if(rc == SQL_ERROR)
                goto error;
        }
    }

    if(pStmt->pCopyCmd)
    {
        // Was it COPY command with STDIN or CLIENT?
        rc = checkAndHandleCopyStdinOrClient(pStmt,rc,FALSE);
    }
    else
    if(pStmt->pUnloadCmd)
    {
        // Was it UNLOAD command with CLIENT?
        rc = checkAndHandleCopyOutClient(pStmt,rc);
    }

    if(iBeginCommand)
    {
        if(pConn->pConnAttr->iAutoCommit != SQL_AUTOCOMMIT_OFF 
            && pStmt->iFunctionCall == TRUE
            && !libpqIsTransactionIdle(pConn))
        {
            // Send commit/rollback after function call, if auto commit is ON.
            libpqExecuteTransactionCommand(pConn, (char *)((rc != SQL_ERROR) ? COMMIT_CMD : ROLLBACK_CMD), FALSE);
        }
    }

    if(iLockRequired)
    {
        // Unlock connection sem
        rsUnlockSem(pConn->hSemMultiStmt);
    }

    return rc;

error:

    // Clean param buffers
    if(iNumBindParams > 0)
    {
        ppBindParamVals = (char **)rs_free(ppBindParamVals);
        piParamFormats  = (int *)rs_free(piParamFormats);

        if(pBindParamStrBuf)
        {
            for(iBindParam = 0; iBindParam < iNumBindParams; iBindParam++)
            {
                if(pBindParamStrBuf[iBindParam].iAllocDataLen > 0)
                {
                    pBindParamStrBuf[iBindParam].pBuf = (char *)rs_free(pBindParamStrBuf[iBindParam].pBuf);
                    pBindParamStrBuf[iBindParam].iAllocDataLen = 0;
                }
            }

            pBindParamStrBuf = (RS_BIND_PARAM_STR_BUF *)rs_free(pBindParamStrBuf);
        }
    }


    if(iLockRequired)
    {
        // Unlock connection sem
        rsUnlockSem(pConn->hSemMultiStmt);
    }

    return rc;
}