RETCODE SQL_API OPENSEARCHAPI_ExtendedFetch()

in sql-odbc/src/sqlodbc/results.c [1217:1544]


RETCODE SQL_API OPENSEARCHAPI_ExtendedFetch(HSTMT hstmt, SQLUSMALLINT fFetchType,
                                    SQLLEN irow, SQLULEN *pcrow,
                                    SQLUSMALLINT *rgfRowStatus,
                                    SQLLEN bookmark_offset, SQLLEN rowsetSize) {
    UNUSED(bookmark_offset, irow);
    CSTR func = "OPENSEARCHAPI_ExtendedFetch";
    StatementClass *stmt = (StatementClass *)hstmt;
    ARDFields *opts;
    QResultClass *res;
    BindInfoClass *bookmark;
    SQLLEN num_tuples, i, fc_io;
    SQLLEN save_rowset_size, progress_size;
    SQLLEN rowset_start, rowset_end = (-1);
    RETCODE result = SQL_SUCCESS;
    char truncated, error, should_set_rowset_start = FALSE;
    SQLLEN currp;
    UWORD pstatus;
    BOOL currp_is_valid, reached_eof, useCursor;
    SQLLEN reqsize = rowsetSize;

    MYLOG(OPENSEARCH_TRACE, "entering stmt=%p rowsetSize=" FORMAT_LEN "\n", stmt,
          rowsetSize);

    if (!stmt) {
        SC_log_error(func, NULL_STRING, NULL);
        return SQL_INVALID_HANDLE;
    }

    /* if (SC_is_fetchcursor(stmt) && !stmt->manual_result) */
    if ((SQL_CURSOR_FORWARD_ONLY != stmt->options.cursor_type)
        || (fFetchType != SQL_FETCH_NEXT)) {
        SC_set_error(stmt, STMT_FETCH_OUT_OF_RANGE,
                     "Only SQL_CURSOR_FORWARD_ONLY with SQL_FETCH_NEXT "
                     "cursor's are supported.",
                     func);
        return SQL_ERROR;
    }

    SC_clear_error(stmt);

    if (!(res = SC_get_Curres(stmt), res)) {
        SC_set_error(stmt, STMT_INVALID_CURSOR_STATE_ERROR,
                     "Null statement result in OPENSEARCHAPI_ExtendedFetch.", func);
        return SQL_ERROR;
    }

    opts = SC_get_ARDF(stmt);
    /*
     * If a bookmark column is bound but bookmark usage is off, then error.
     */
    if ((bookmark = opts->bookmark, bookmark) && bookmark->buffer
        && stmt->options.use_bookmarks == SQL_UB_OFF) {
        SC_set_error(
            stmt, STMT_COLNUM_ERROR,
            "Attempt to retrieve bookmark with bookmark usage disabled", func);
        return SQL_ERROR;
    }

    if (stmt->status == STMT_EXECUTING) {
        SC_set_error(stmt, STMT_SEQUENCE_ERROR,
                     "Can't fetch while statement is still executing.", func);
        return SQL_ERROR;
    }

    if (stmt->status != STMT_FINISHED) {
        SC_set_error(stmt, STMT_STATUS_ERROR,
                     "ExtendedFetch can only be called after the successful "
                     "execution on a SQL statement",
                     func);
        return SQL_ERROR;
    }

    if (opts->bindings == NULL) {
        if (!SC_may_fetch_rows(stmt))
            return SQL_NO_DATA_FOUND;
        /* just to avoid a crash if the user insists on calling this */
        /* function even if SQL_ExecDirect has reported an Error */
        SC_set_error(stmt, STMT_INVALID_CURSOR_STATE_ERROR,
                     "Bindings were not allocated properly.", func);
        return SQL_ERROR;
    }

    /* Initialize to no rows fetched */
    if (rgfRowStatus)
        for (i = 0; i < rowsetSize; i++)
            *(rgfRowStatus + i) = SQL_ROW_NOROW;

    if (pcrow)
        *pcrow = 0;

    useCursor = (SC_is_fetchcursor(stmt) && NULL != QR_get_cursor(res));
    num_tuples = QR_get_num_total_tuples(res);
    reached_eof = QR_once_reached_eof(res) && QR_get_cursor(res);
    if (useCursor && !reached_eof)
        num_tuples = INT_MAX;

    MYLOG(OPENSEARCH_ALL, "num_tuples=" FORMAT_LEN "\n", num_tuples);
    /* Save and discard the saved rowset size */
    save_rowset_size = stmt->save_rowset_size;
    stmt->save_rowset_size = -1;
    rowset_start = SC_get_rowset_start(stmt);

    QR_stop_movement(res);
    res->move_offset = 0;
    switch (fFetchType) {
        case SQL_FETCH_NEXT:
            progress_size =
                (save_rowset_size > 0 ? save_rowset_size : rowsetSize);
            if (rowset_start < 0)
                SC_set_rowset_start(stmt, 0, TRUE);
            else if (res->keyset) {
                if (stmt->last_fetch_count <= progress_size) {
                    SC_inc_rowset_start(
                        stmt, stmt->last_fetch_count_include_ommitted);
                    progress_size -= stmt->last_fetch_count;
                }
                if (progress_size > 0) {
                    if (getNthValid(res, SC_get_rowset_start(stmt),
                                    SQL_FETCH_NEXT, progress_size + 1,
                                    &rowset_start)
                        <= 0) {
                        EXTFETCH_RETURN_EOF(stmt, res)
                    } else
                        should_set_rowset_start = TRUE;
                }
            } else
                SC_inc_rowset_start(stmt, progress_size);
            MYLOG(OPENSEARCH_DEBUG,
                  "SQL_FETCH_NEXT: num_tuples=" FORMAT_LEN
                  ", currtuple=" FORMAT_LEN ", rowst=" FORMAT_LEN "\n",
                  num_tuples, stmt->currTuple, rowset_start);
            break;
        default:
            SC_set_error(stmt, STMT_FETCH_OUT_OF_RANGE,
                         "Unsupported OPENSEARCHAPI_ExtendedFetch Direction", func);
            return SQL_ERROR;
    }

    /*
     * CHECK FOR PROPER CURSOR STATE
     */

    /*
     * Handle Declare Fetch style specially because the end is not really
     * the end...
     */
    if (!should_set_rowset_start)
        rowset_start = SC_get_rowset_start(stmt);

    // Get more results when cursor reaches end 
    { 
        ConnectionClass *conn = SC_get_conn(stmt);
        if (conn != NULL) {
            const SQLLEN end_rowset_size = rowset_start + rowsetSize;
            while ((end_rowset_size >= num_tuples)
                   && (NULL != res->server_cursor_id)) {
                GetNextResultSet(stmt);
                num_tuples = QR_get_num_total_tuples(res);
            }
        }
    }

    if (useCursor) {
        if (reached_eof && rowset_start >= num_tuples) {
            EXTFETCH_RETURN_EOF(stmt, res)
        }
    } else {
        /* If *new* rowset is after the result_set, return no data found */
        if (rowset_start >= num_tuples) {
            EXTFETCH_RETURN_EOF(stmt, res)
        }
    }
    /* If *new* rowset is prior to result_set, return no data found */
    if (rowset_start < 0) {
        if (rowset_start + rowsetSize <= 0) {
            EXTFETCH_RETURN_BOF(stmt, res)
        } else { /* overlap with beginning of result set,
                  * so get first rowset */
            SC_set_rowset_start(stmt, 0, TRUE);
        }
        should_set_rowset_start = FALSE;
    }

#ifdef __APPLE__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wkeyword-macro"
#endif  // __APPLE__
#define return DONT_CALL_RETURN_FROM_HERE ? ? ?
#ifdef __APPLE__
#pragma clang diagnostic pop
#endif  // __APPLE__
    /* set the rowset_start if needed */
    if (should_set_rowset_start)
        SC_set_rowset_start(stmt, rowset_start, TRUE);
    if (rowset_end < 0 && QR_haskeyset(res)) {
        getNthValid(res, rowset_start, SQL_FETCH_NEXT, rowsetSize, &rowset_end);
        reqsize = rowset_end - rowset_start + 1;
    }
    QR_set_reqsize(res, (Int4)reqsize);
    /* currTuple is always 1 row prior to the rowset start */
    stmt->currTuple = RowIdx2GIdx(-1, stmt);
    QR_set_rowstart_in_cache(res, SC_get_rowset_start(stmt));

    /* Physical Row advancement occurs for each row fetched below */

    MYLOG(OPENSEARCH_DEBUG, "new currTuple = " FORMAT_LEN "\n", stmt->currTuple);

    truncated = error = FALSE;

    currp = -1;
    stmt->bind_row = 0; /* set the binding location */
    result = SC_fetch(stmt);
    if (SQL_ERROR == result)
        goto cleanup;
    if (SQL_NO_DATA_FOUND != result && res->keyset) {
        currp = GIdx2KResIdx(SC_get_rowset_start(stmt), stmt, res);
        MYLOG(OPENSEARCH_ALL, "currp=" FORMAT_LEN "\n", currp);
        if (currp < 0) {
            result = SQL_ERROR;
            MYLOG(OPENSEARCH_DEBUG,
                  "rowset_start=" FORMAT_LEN " but currp=" FORMAT_LEN "\n",
                  SC_get_rowset_start(stmt), currp);
            SC_set_error(stmt, STMT_INTERNAL_ERROR,
                         "rowset_start not in the keyset", func);
            goto cleanup;
        }
    }
    for (i = 0, fc_io = 0; SQL_NO_DATA_FOUND != result && SQL_ERROR != result;
         currp++) {
        fc_io++;
        currp_is_valid = FALSE;
        if (res->keyset) {
            if ((SQLULEN)currp < res->num_cached_keys) {
                currp_is_valid = TRUE;
                res->keyset[currp].status &=
                    ~CURS_IN_ROWSET; /* Off the flag first */
            } else {
                MYLOG(OPENSEARCH_DEBUG, "Umm current row is out of keyset\n");
                break;
            }
        }
        MYLOG(OPENSEARCH_ALL, "ExtFetch result=%d\n", result);
        if (currp_is_valid && SQL_SUCCESS_WITH_INFO == result
            && 0 == stmt->last_fetch_count) {
            MYLOG(OPENSEARCH_ALL, "just skipping deleted row " FORMAT_LEN "\n", currp);
            if (rowsetSize - i + fc_io > reqsize)
                QR_set_reqsize(res, (Int4)(rowsetSize - i + fc_io));
            result = SC_fetch(stmt);
            if (SQL_ERROR == result)
                break;
            continue;
        }

        /* Determine Function status */
        if (result == SQL_SUCCESS_WITH_INFO)
            truncated = TRUE;
        else if (result == SQL_ERROR)
            error = TRUE;

        /* Determine Row Status */
        if (rgfRowStatus) {
            if (result == SQL_ERROR)
                *(rgfRowStatus + i) = SQL_ROW_ERROR;
            else if (currp_is_valid) {
                pstatus = (res->keyset[currp].status & KEYSET_INFO_PUBLIC);
                if (pstatus != 0 && pstatus != SQL_ROW_ADDED) {
                    rgfRowStatus[i] = pstatus;
                } else
                    rgfRowStatus[i] = SQL_ROW_SUCCESS;
                /* refresh the status */
                /* if (SQL_ROW_DELETED != pstatus) */
                res->keyset[currp].status &= (~KEYSET_INFO_PUBLIC);
            } else
                *(rgfRowStatus + i) = SQL_ROW_SUCCESS;
        }
        if (SQL_ERROR != result && currp_is_valid)
            res->keyset[currp].status |=
                CURS_IN_ROWSET; /* This is the unique place where the
                                   CURS_IN_ROWSET bit is turned on */
        i++;
        if (i >= rowsetSize)
            break;
        stmt->bind_row = (SQLSETPOSIROW)i; /* set the binding location */
        result = SC_fetch(stmt);
    }
    if (SQL_ERROR == result)
        goto cleanup;

    /* Save the fetch count for SQLSetPos */
    stmt->last_fetch_count = i;
    stmt->save_rowset_size = rowsetSize;
    /*
    currp = KResIdx2GIdx(currp, stmt, res);
    stmt->last_fetch_count_include_ommitted = GIdx2RowIdx(currp, stmt);
    */
    stmt->last_fetch_count_include_ommitted = fc_io;

    /* Reset next binding row */
    stmt->bind_row = 0;

    /* Move the cursor position to the first row in the result set. */
    stmt->currTuple = RowIdx2GIdx(0, stmt);

    /* For declare/fetch, need to reset cursor to beginning of rowset */
    if (useCursor)
        QR_set_position(res, 0);

    /* Set the number of rows retrieved */
    if (pcrow)
        *pcrow = i;
    MYLOG(OPENSEARCH_ALL, "pcrow=" FORMAT_LEN "\n", i);

    if (i == 0)
        /* Only DeclareFetch should wind up here */
        result = SQL_NO_DATA_FOUND;
    else if (error)
        result = SQL_ERROR;
    else if (truncated)
        result = SQL_SUCCESS_WITH_INFO;
    else if (SC_get_errornumber(stmt) == STMT_POS_BEFORE_RECORDSET)
        result = SQL_SUCCESS_WITH_INFO;
    else
        result = SQL_SUCCESS;

cleanup:
#undef return
    return result;
}