in storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp [1698:2598]
bool Dbtup::scanNext(Signal *signal, ScanOpPtr scanPtr) {
ScanOp &scan = *scanPtr.p;
ScanPos &pos = scan.m_scanPos;
Local_key &key = pos.m_key;
const Uint32 bits = scan.m_bits;
// table
TablerecPtr tablePtr;
tablePtr.i = scan.m_tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
Tablerec &table = *tablePtr.p;
m_curr_tabptr = tablePtr;
// fragment
FragrecordPtr fragPtr;
fragPtr.i = scan.m_fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
Fragrecord &frag = *fragPtr.p;
m_curr_fragptr = fragPtr;
// tuple found
Tuple_header *tuple_header_ptr = 0;
Uint32 thbits = 0;
Uint32 loop_count = 0;
Uint32 foundGCI;
const bool mm_index = (bits & ScanOp::SCAN_DD);
const bool lcp = (bits & ScanOp::SCAN_LCP);
const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0)
? table.m_offsets[mm_index].m_fix_header_size
: 1;
const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
if (lcp && !fragPtr.p->m_lcp_keep_list_head.isNull()) {
jam();
/**
* Handle lcp keep list here too, due to scanCont
*/
/* Coverage tested */
ndbassert(!m_is_query_block);
handle_lcp_keep(signal, fragPtr, scanPtr.p);
scan.m_last_seen = __LINE__;
return false;
}
switch (pos.m_get) {
case ScanPos::Get_next_tuple:
jam();
key.m_page_idx += size;
pos.m_get = ScanPos::Get_page;
pos.m_realpid_mm = RNIL;
break;
case ScanPos::Get_tuple:
jam();
/**
* We need to refetch page after timeslice
*/
pos.m_get = ScanPos::Get_page;
pos.m_realpid_mm = RNIL;
break;
default:
break;
}
while (true) {
switch (pos.m_get) {
case ScanPos::Get_next_page:
// move to next page
jam();
{
if (!(bits & ScanOp::SCAN_DD))
pos.m_get = ScanPos::Get_next_page_mm;
else
pos.m_get = ScanPos::Get_next_page_dd;
}
continue;
case ScanPos::Get_page:
// get real page
jam();
{
if (!(bits & ScanOp::SCAN_DD))
pos.m_get = ScanPos::Get_page_mm;
else
pos.m_get = ScanPos::Get_page_dd;
}
continue;
case ScanPos::Get_next_page_mm:
// move to next logical TUP page
jam();
{
/**
* Code for future activation, see below for more details.
* bool break_flag;
* break_flag = false;
*/
key.m_page_no++;
if (likely(bits & ScanOp::SCAN_LCP)) {
jam();
/* Coverage tested path */
/**
* We could be scanning for a long time and only finding LCP_SKIP
* records, we need to keep the LCP watchdog aware that we are
* progressing, so we report each change to a new page by reporting
* the id of the next page to scan.
*/
c_backup->update_lcp_pages_scanned(
signal, c_lqh->get_scan_api_op_ptr(scan.m_userPtr),
key.m_page_no, scan.m_scanGCI,
pos.m_lcp_scan_changed_rows_page);
scan.m_last_seen = __LINE__;
}
if (unlikely(key.m_page_no >= frag.m_max_page_cnt)) {
if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL)) {
if (key.m_page_no < scan.m_endPage) {
jam();
DEB_NR_SCAN(("scanning page %u", key.m_page_no));
goto cont;
}
jam();
// no more pages, scan ends
pos.m_get = ScanPos::Get_undef;
scan.m_state = ScanOp::Last;
return true;
} else if (bits & ScanOp::SCAN_LCP &&
key.m_page_no < scan.m_endPage) {
/**
* We come here with ScanOp::SCAN_LCP set AND
* frag.m_max_page_cnt < scan.m_endPage. In this case
* it is still ok to finish the LCP scan. The missing
* pages are handled when they are dropped, so before
* we drop a page we record all entries that needs
* recording for the LCP. These have been sent to the
* LCP keep list. Since when we come here the LCP keep
* list is empty we are done with the scan.
*
* We will however continue the scan for LCP scans. The
* reason is that we might have set the LCP_SCANNED_BIT
* on pages already dropped. So we need to continue scanning
* to ensure that all the lcp scanned bits are reset.
*
* For the moment this code is unreachable since m_max_page_cnt
* cannot decrease. Thus m_max_page_cnt cannot be smaller
* than scan.m_endPage since scan.m_endPage is initialised to
* m_max_page_cnt at start of scan.
*
* This is currently not implemented. So we
* will make this code path using an ndbrequire instead.
*
* We keep the code as comments to be activated when we implement
* the possibility to release pages in the directory.
*/
ndbabort();
/* We will not scan this page, so reset flag immediately */
// reset_lcp_scanned_bit(fragPtr.p, key.m_page_no);
// scan.m_last_seen = __LINE__;
// break_flag = true;
} else {
// no more pages, scan ends
pos.m_get = ScanPos::Get_undef;
scan.m_last_seen = __LINE__;
scan.m_state = ScanOp::Last;
return true;
}
}
if (unlikely((bits & ScanOp::SCAN_LCP) &&
(key.m_page_no >= scan.m_endPage))) {
jam();
/**
* We have arrived at a page number that didn't exist at start of
* LCP, we can quit the LCP scan since we cannot find any more
* pages that are containing rows to be saved in LCP.
*/
// no more pages, scan ends
pos.m_get = ScanPos::Get_undef;
scan.m_last_seen = __LINE__;
scan.m_state = ScanOp::Last;
return true;
}
/**
* Activate this code if we implement support for decreasing
* frag.m_max_page_cnt
*
* if (break_flag)
* {
* jam();
* pos.m_get = ScanPos::Get_next_page_mm;
* scan.m_last_seen = __LINE__;
* break; // incr loop count
* }
*/
cont:
key.m_page_idx = first;
pos.m_get = ScanPos::Get_page_mm;
// clear cached value
pos.m_realpid_mm = RNIL;
}
[[fallthrough]];
case ScanPos::Get_page_mm:
// get TUP real page
{
PagePtr pagePtr;
loop_count += 4;
if (pos.m_realpid_mm == RNIL) {
Uint32 *next_ptr, *prev_ptr;
if (bits & ScanOp::SCAN_LCP) {
jam();
pos.m_realpid_mm = getRealpidScan(fragPtr.p, key.m_page_no,
&next_ptr, &prev_ptr);
Uint32 ret_val =
prepare_lcp_scan_page(scan, key, next_ptr, prev_ptr);
if (ret_val == ZSCAN_FOUND_PAGE_END)
break;
else if (ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE)
goto record_dropped_change_page;
/* else continue */
} else if (bits & ScanOp::SCAN_NR) {
pos.m_realpid_mm = getRealpidScan(fragPtr.p, key.m_page_no,
&next_ptr, &prev_ptr);
if (unlikely(pos.m_realpid_mm == RNIL)) {
jam();
pagePtr.p = nullptr;
goto nopage;
}
} else {
/**
* Ensure that we access the page map with protection from
* the query thread, no need for this protection from LDM
* thread.
*/
acquire_frag_page_map_mutex_read(fragPtr.p);
pos.m_realpid_mm = getRealpidCheck(fragPtr.p, key.m_page_no);
release_frag_page_map_mutex_read(fragPtr.p);
if (unlikely(pos.m_realpid_mm == RNIL)) {
jam();
pos.m_get = ScanPos::Get_next_page_mm;
break; // incr loop count
}
jam();
}
} else {
jam();
}
ndbrequire(c_page_pool.getPtr(pagePtr, pos.m_realpid_mm));
/**
* We are in the process of performing a Full table scan, this can be
* either due to a user requesting a full table scan, it can also be
* as part of Node Recovery where we are assisting the starting node
* to be synchronized (SCAN_NR set) and it is also used for LCP scans
* (SCAN_LCP set).
*
* We know that we will touch all cache lines where there is a tuple
* header and all scans using main memory pages are done on the fixed
* pages. To speed up scan processing we will prefetch such that we
* always are a few tuples ahead. We scan ahead 4 tuples here and then
* we scan yet one more ahead at each new tuple we get to. We only
* need initialise by scanning 3 rows ahead since we will immediately
* fetch the fourth one before looking at the first row.
*
* PREFETCH_SCAN_TUPLE:
*/
if (likely((key.m_page_idx + (size * 3)) <= Fix_page::DATA_WORDS)) {
struct Tup_fixsize_page *page_ptr =
(struct Tup_fixsize_page *)pagePtr.p;
NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx, size));
NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + size, size));
NDB_PREFETCH_READ(
page_ptr->get_ptr(key.m_page_idx + (size * 2), size));
}
if (bits & ScanOp::SCAN_LCP) {
if (pagePtr.p->is_page_to_skip_lcp()) {
Uint32 ret_val = handle_lcp_skip_page(scan, key, pagePtr.p);
if (ret_val == ZSCAN_FOUND_PAGE_END) {
jamDebug();
break;
} else {
jamDebug();
ndbrequire(ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE);
goto record_dropped_change_page;
}
} else if (pos.m_lcp_scan_changed_rows_page) {
/* CHANGE page is accessed */
if (key.m_page_idx == 0) {
jamDebug();
/* First access of a CHANGE page */
Uint32 ret_val = setup_change_page_for_scan(
scan, (Fix_page *)pagePtr.p, key, size);
if (ret_val == ZSCAN_FOUND_PAGE_END) {
jamDebug();
/* No changes found on page level bitmaps */
break;
} else {
ndbrequire(ret_val == ZSCAN_FOUND_TUPLE);
}
}
} else {
/* LCP ALL page is accessed */
jamDebug();
/**
* Make sure those values have defined values if we were to enter
* the wrong path for some reason. These values will lead to a
* crash if we try to run the CHANGE page code for an ALL page.
*/
pos.m_all_rows = false;
pos.m_next_small_area_check_idx = RNIL;
pos.m_next_large_area_check_idx = RNIL;
}
}
/* LCP normal case 4a) above goes here */
nopage:
pos.m_page = pagePtr.p;
pos.m_get = ScanPos::Get_tuple;
}
continue;
case ScanPos::Get_next_page_dd:
// move to next disk page
jam();
{
Disk_alloc_info &alloc = frag.m_disk_alloc_info;
Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
Ptr<Extent_info> ext_ptr;
ndbrequire(c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i));
Extent_info *ext = ext_ptr.p;
key.m_page_no++;
if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
// no more pages in this extent
jam();
if (!list.next(ext_ptr)) {
// no more extents, scan ends
jam();
pos.m_get = ScanPos::Get_undef;
scan.m_state = ScanOp::Last;
return true;
} else {
// move to next extent
jam();
pos.m_extent_info_ptr_i = ext_ptr.i;
ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
key.m_file_no = ext->m_key.m_file_no;
key.m_page_no = ext->m_first_page_no;
}
}
key.m_page_idx = first;
pos.m_get = ScanPos::Get_page_dd;
/*
read ahead for scan in disk order
do read ahead every 8:th page
*/
if ((bits & ScanOp::SCAN_DD) &&
(((key.m_page_no - ext->m_first_page_no) & 7) == 0)) {
jam();
// initialize PGMAN request
Page_cache_client::Request preq;
preq.m_page = pos.m_key;
preq.m_callback = TheNULLCallback;
// set maximum read ahead
Uint32 read_ahead = m_max_page_read_ahead;
while (true) {
// prepare page read ahead in current extent
Uint32 page_no = preq.m_page.m_page_no;
Uint32 page_no_limit = page_no + read_ahead;
Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
if (page_no_limit > limit) {
jam();
// read ahead crosses extent, set limit for this extent
read_ahead = page_no_limit - limit;
page_no_limit = limit;
// and make sure we only read one extra extent next time around
if (read_ahead > alloc.m_extent_size)
read_ahead = alloc.m_extent_size;
} else {
jam();
read_ahead = 0; // no more to read ahead after this
}
// do read ahead pages for this extent
while (page_no < page_no_limit) {
// page request to PGMAN
jam();
preq.m_page.m_page_no = page_no;
preq.m_table_id = frag.fragTableId;
preq.m_fragment_id = frag.fragmentId;
int flags = Page_cache_client::DISK_SCAN;
// ignore result
Page_cache_client pgman(this, c_pgman);
pgman.get_page(signal, preq, flags);
jamEntry();
page_no++;
}
if (!read_ahead || !list.next(ext_ptr)) {
// no more extents after this or read ahead done
jam();
break;
}
// move to next extent and initialize PGMAN request accordingly
Extent_info *ext = c_extent_pool.getPtr(ext_ptr.i);
preq.m_page.m_file_no = ext->m_key.m_file_no;
preq.m_page.m_page_no = ext->m_first_page_no;
}
} // if ScanOp::SCAN_DD read ahead
}
[[fallthrough]];
case ScanPos::Get_page_dd:
// get global page in PGMAN cache
jam();
{
// check if page is un-allocated or empty
if (likely(!(bits & ScanOp::SCAN_NR))) {
D("Tablespace_client - scanNext");
Tablespace_client tsman(
signal, this, c_tsman, frag.fragTableId, frag.fragmentId,
c_lqh->getCreateSchemaVersion(frag.fragTableId),
frag.m_tablespace_id);
unsigned uncommitted, committed;
uncommitted = committed = ~(unsigned)0;
int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
ndbrequire(ret == 0);
if (committed == 0 && uncommitted == 0) {
// skip empty page
jam();
pos.m_get = ScanPos::Get_next_page_dd;
break; // incr loop count
}
}
// page request to PGMAN
Page_cache_client::Request preq;
preq.m_page = pos.m_key;
preq.m_table_id = frag.fragTableId;
preq.m_fragment_id = frag.fragmentId;
preq.m_callback.m_callbackData = scanPtr.i;
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_page_tup_scan_callback);
int flags = Page_cache_client::DISK_SCAN;
Page_cache_client pgman(this, c_pgman);
Ptr<GlobalPage> pagePtr;
int res = pgman.get_page(signal, preq, flags);
pagePtr = pgman.m_ptr;
jamEntry();
if (res == 0) {
jam();
// request queued
pos.m_get = ScanPos::Get_tuple;
return false;
} else if (res < 0) {
jam();
if (res == -1) {
jam();
m_scan_error_code = Uint32(~0);
} else {
jam();
res = -res;
m_scan_error_code = res;
}
/* Flag to reply code that we have an error */
scan.m_state = ScanOp::Invalid;
return true;
}
ndbrequire(res > 0);
pos.m_page = (Page *)pagePtr.p;
}
pos.m_get = ScanPos::Get_tuple;
continue;
// get tuple
// move to next tuple
case ScanPos::Get_next_tuple:
// move to next fixed size tuple
jam();
{
key.m_page_idx += size;
pos.m_get = ScanPos::Get_tuple;
}
[[fallthrough]];
case ScanPos::Get_tuple:
// get fixed size tuple
jam();
if ((bits & ScanOp::SCAN_VS) == 0) {
Fix_page *page = (Fix_page *)pos.m_page;
if (key.m_page_idx + size <= Fix_page::DATA_WORDS) {
pos.m_get = ScanPos::Get_next_tuple;
if (unlikely((bits & ScanOp::SCAN_NR) &&
pos.m_realpid_mm == RNIL)) {
/**
* pos.m_page isn't initialized this path, so handle early
* We're doing a node restart and we are scanning beyond our
* existing rowid's since starting node had those rowid's
* defined.
*/
jam();
foundGCI = 0;
goto found_deleted_rowid;
}
#ifdef VM_TRACE
if (!(bits & ScanOp::SCAN_DD)) {
acquire_frag_page_map_mutex_read(fragPtr.p);
Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
release_frag_page_map_mutex_read(fragPtr.p);
ndbrequire(pos.m_realpid_mm == realpid);
}
#endif
tuple_header_ptr = (Tuple_header *)&page->m_data[key.m_page_idx];
if ((key.m_page_idx + (size * 4)) <= Fix_page::DATA_WORDS) {
/**
* Continue staying ahead of scan on this page by prefetching
* a row 4 tuples ahead of this tuple, prefetched the first 3
* at PREFETCH_SCAN_TUPLE.
*/
struct Tup_fixsize_page *page_ptr =
(struct Tup_fixsize_page *)page;
NDB_PREFETCH_READ(
page_ptr->get_ptr(key.m_page_idx + (size * 3), size));
}
if (likely((!((bits & ScanOp::SCAN_NR) ||
(bits & ScanOp::SCAN_LCP))) ||
((bits & ScanOp::SCAN_LCP) &&
!pos.m_lcp_scan_changed_rows_page))) {
jam();
/**
* We come here for normal full table scans and also for LCP
* scans where we scan ALL ROWS pages.
*
* We simply check if the row is free, if it isn't then we will
* handle it. For LCP scans we will also check at found_tuple that
* the LCP_SKIP bit isn't set. If it is then the rowid was empty
* at start of LCP. If the rowid is free AND we are scanning an
* ALL ROWS page then the LCP_SKIP cannot be set, this is set only
* for CHANGED ROWS pages when deleting tuples.
*
* Free rowid's might have existed at start of LCP. This was
* handled by using the LCP keep list when tuple was deleted.
* So when we come here we don't have to worry about LCP scanning
* those rows.
*
* LCP_DELETE flag can never be set on ALL ROWS pages.
*
* The state Tuple_header::ALLOC means that the row is being
* inserted, it thus have no current committed state and is
* thus here equivalent to the FREE state for LCP scans.
*
* We need to acquire the TUP fragment mutex before reading the
* tuple header bits. The reason for this is to ensure that
* we don't interact with INSERT operations that will
* manipulate the header bits during allocation of a new row.
*
* If someone is inserting a row in this very position we will
* hold the mutex and thus acquiring the mutex here for query
* threads ensure that they don't read a row in the middle of
* its insertion process.
*/
acquire_frag_mutex_read(fragPtr.p, key.m_page_no);
thbits = tuple_header_ptr->m_header_bits;
release_frag_mutex_read(fragPtr.p, key.m_page_no);
if ((bits & ScanOp::SCAN_LCP) &&
(thbits & Tuple_header::LCP_DELETE)) {
g_eventLogger->info(
"(%u)LCP_DELETE on tab(%u,%u), row(%u,%u)"
" ALL ROWS page, header: %x",
instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
key.m_page_no, key.m_page_idx, thbits);
ndbabort();
}
if (!((thbits & Tuple_header::FREE ||
thbits & Tuple_header::DELETE_WAIT) ||
((bits & ScanOp::SCAN_LCP) &&
(thbits & Tuple_header::ALLOC)))) {
jam();
scan.m_last_seen = __LINE__;
goto found_tuple;
}
/**
* Ensure that LCP_SKIP bit is clear before we move on
* It could be set if the row was inserted after LCP
* start and then followed by a delete of the row before
* we arrive here.
*/
if ((bits & ScanOp::SCAN_LCP) &&
(thbits & Tuple_header::LCP_SKIP)) {
jam();
acquire_frag_mutex(fragPtr.p, key.m_page_no);
tuple_header_ptr->m_header_bits =
thbits & (~Tuple_header::LCP_SKIP);
DEB_LCP_SKIP(
("(%u)Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
", header: %x"
", new header: %x"
", tuple_header_ptr: %p",
instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
key.m_page_no, key.m_page_idx, thbits,
tuple_header_ptr->m_header_bits, tuple_header_ptr));
updateChecksum(tuple_header_ptr, tablePtr.p, thbits,
tuple_header_ptr->m_header_bits);
release_frag_mutex(fragPtr.p, key.m_page_no);
}
scan.m_last_seen = __LINE__;
} else if (bits & ScanOp::SCAN_NR) {
thbits = tuple_header_ptr->m_header_bits;
if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
scan.m_scanGCI ||
foundGCI == 0) {
/**
* foundGCI == 0 means that the row is initialised but has not
* yet been committed as part of insert transaction. All other
* rows have the GCI entry set to last GCI it was changed, this
* is true for even deleted rows as long as the page is still
* maintained by the fragment.
*
* When foundGCI == 0 there are two cases.
* The first case is that thbits == Fix_page::FREE_RECORD.
* In this case the tuple doesn't exist and should be
* deleted if existing in the starting node.
* As part of Fix_page::FREE_RECORD the Tuple_header::FREE
* bit is set. So this is handled below.
* The second case is that thbits == Tuple_header::ALLOC.
* In this case the tuple is currently being inserted, but the
* transaction isn't yet committed. In this case we will follow
* the found_tuple path. This means that we will attempt to
* lock the tuple, this will be unsuccessful since the row
* is currently being inserted and is locked for write.
* When the commit happens the row lock is released and the
* copy scan will continue on this row. It will send an INSERT
* to the starting node. Most likely the INSERT transaction
* was started after the copy scan started, in this case the
* INSERT will simply be converted to an UPDATE by the starting
* node. If the insert was started before the new replica of
* the fragment was included, the INSERT will be performed.
* This is the reason why we have to go the extra mile here to
* ensure that we don't lose records that are being inserted as
* part of long transactions.
*
* The final problem is when the INSERT is aborted. In this case
* we return from the lock row in execACCKEYREF. Since the row
* is now in the Tuple_header::FREE state we must re-read the
* row again. This is handled by changing the pos.m_get state
* to Get_tuple instead of Get_next_tuple.
*/
if (!(thbits & Tuple_header::FREE ||
thbits & Tuple_header::DELETE_WAIT)) {
jam();
goto found_tuple;
} else {
goto found_deleted_rowid;
}
} else if ((thbits & Fix_page::FREE_RECORD) !=
Fix_page::FREE_RECORD &&
tuple_header_ptr->m_operation_ptr_i != RNIL) {
jam();
goto found_tuple; // Locked tuple...
// skip free tuple
}
DEB_NR_SCAN_EXTRA(
("(%u)NR_SCAN_SKIP:tab(%u,%u) row(%u,%u),"
" recGCI: %u, scanGCI: %u, header: %x",
instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
key.m_page_no, key.m_page_idx, foundGCI, scan.m_scanGCI,
thbits));
} else {
ndbrequire(c_backup->is_partial_lcp_enabled());
ndbrequire((bits & ScanOp::SCAN_LCP) &&
pos.m_lcp_scan_changed_rows_page);
Uint32 ret_val;
if (!pos.m_all_rows) {
ret_val = move_to_next_change_page_row(
scan, page, &tuple_header_ptr, loop_count, size);
if (ret_val == ZSCAN_FOUND_PAGE_END) {
/**
* We have finished scanning a CHANGE PAGE row where we
* checked even the parts of a page. In this case we
* perform very detailed analysis that we clear all bits
* while scanning. To handle this we will set a special
* bit if anyone updates any row in the page while
* we are scanning in this mode. This ensures that the
* flag bits are in read-only mode and only updated by
* LCP scanning. We don't track which part of page is
* updated in this case, so if any updates have been
* performed on page in this state, all bits on page
* are set to ensure that we will scan the entire page
* in the next LCP scan.
*/
ndbassert(!page->get_any_changes());
page->clear_page_being_lcp_scanned();
if (page->get_and_clear_change_while_lcp_scan()) {
jamDebug();
page->set_all_change_map();
}
/**
* We've finished scanning a page that was using filtering
* using the bitmaps on the page. We are ready to set the last
* LCP state to A.
*/
/* Coverage tested */
set_last_lcp_state(fragPtr.p, key.m_page_no,
false /* Set state to A */);
scan.m_last_seen = __LINE__;
pos.m_get = ScanPos::Get_next_page;
break;
}
}
ret_val = handle_scan_change_page_rows(
scan, page, tuple_header_ptr, foundGCI, fragPtr.p);
if (likely(ret_val == ZSCAN_FOUND_TUPLE)) {
thbits = tuple_header_ptr->m_header_bits;
goto found_tuple;
} else if (ret_val == ZSCAN_FOUND_DELETED_ROWID)
goto found_deleted_rowid;
ndbrequire(ret_val == ZSCAN_FOUND_NEXT_ROW);
}
} else {
jam();
/**
* We've finished scanning a page, for LCPs we are ready to
* set the last LCP state to A.
*/
if (bits & ScanOp::SCAN_LCP) {
jam();
/* Coverage tested */
set_last_lcp_state(fragPtr.p, key.m_page_no,
false /* Set state to A */);
if (!pos.m_all_rows) {
ndbassert(page->verify_change_maps(jamBuffer()));
}
scan.m_last_seen = __LINE__;
}
// no more tuples on this page
pos.m_get = ScanPos::Get_next_page;
}
} else {
jam();
Var_page *page = (Var_page *)pos.m_page;
if (key.m_page_idx < page->high_index) {
jam();
pos.m_get = ScanPos::Get_next_tuple;
if (!page->is_free(key.m_page_idx)) {
tuple_header_ptr = (Tuple_header *)page->get_ptr(key.m_page_idx);
thbits = tuple_header_ptr->m_header_bits;
goto found_tuple;
}
} else {
jam();
// no more tuples on this page
pos.m_get = ScanPos::Get_next_page;
break;
}
}
break; // incr loop count
found_tuple:
// found possible tuple to return
jam();
{
// caller has already set pos.m_get to next tuple
if (likely(!(bits & ScanOp::SCAN_LCP &&
thbits & Tuple_header::LCP_SKIP))) {
Local_key &key_mm = pos.m_key_mm;
if (likely(!(bits & ScanOp::SCAN_DD))) {
key_mm = pos.m_key;
// real page id is already set
if (bits & ScanOp::SCAN_LCP) {
c_backup->update_pause_lcp_counter(loop_count);
}
} else {
/**
* Disk data rows are only accessed in LDM thread.
* Thus no need to acquire mutex for access here.
*/
ndbrequire(!m_is_in_query_thread);
tuple_header_ptr->get_base_record_ref(key_mm);
// recompute for each disk tuple
pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
}
// TUPKEYREQ handles savepoint stuff
scan.m_state = ScanOp::Current;
return true;
} else {
jam();
/* Clear LCP_SKIP bit so that it will not show up in next LCP */
/**
* We need to use a mutex since otherwise readers could calculate
* the wrong checksum.
*/
acquire_frag_mutex(fragPtr.p, key.m_page_no);
tuple_header_ptr->m_header_bits =
thbits & ~(Uint32)Tuple_header::LCP_SKIP;
DEB_LCP_SKIP(
("(%u) 3 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
", header: %x",
instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
key.m_page_no, key.m_page_idx, thbits));
updateChecksum(tuple_header_ptr, tablePtr.p, thbits,
tuple_header_ptr->m_header_bits);
release_frag_mutex(fragPtr.p, key.m_page_no);
scan.m_last_seen = __LINE__;
}
}
break;
record_dropped_change_page : {
ndbrequire(c_backup->is_partial_lcp_enabled());
c_backup->update_pause_lcp_counter(loop_count);
record_delete_by_pageid(signal, frag.fragTableId, frag.fragmentId, scan,
key.m_page_no, size, true);
return false;
}
found_deleted_rowid:
ndbrequire((bits & ScanOp::SCAN_NR) || (bits & ScanOp::SCAN_LCP));
if (!(bits & ScanOp::SCAN_LCP && pos.m_is_last_lcp_state_D)) {
ndbrequire(bits & ScanOp::SCAN_NR ||
pos.m_lcp_scan_changed_rows_page);
Local_key &key_mm = pos.m_key_mm;
if (!(bits & ScanOp::SCAN_DD)) {
jam();
key_mm = pos.m_key;
// caller has already set pos.m_get to next tuple
// real page id is already set
} else {
jam();
/**
* Currently dead code since NR scans never use Disk data scans.
*/
ndbrequire(bits & ScanOp::SCAN_NR);
tuple_header_ptr->get_base_record_ref(key_mm);
// recompute for each disk tuple
pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
Fix_page *mmpage = (Fix_page *)c_page_pool.getPtr(pos.m_realpid_mm);
tuple_header_ptr =
(Tuple_header *)(mmpage->m_data + key_mm.m_page_idx);
if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
scan.m_scanGCI ||
foundGCI == 0) {
thbits = tuple_header_ptr->m_header_bits;
if (!(thbits & Tuple_header::FREE ||
thbits & Tuple_header::DELETE_WAIT)) {
jam();
break;
}
jam();
}
}
/**
* This code handles Node recovery, the row might still exist at the
* starting node although it no longer exists at this live node. We
* send a DELETE by ROWID to the starting node.
*
* This code is also used by LCPs to record deleted row ids.
*/
c_backup->update_pause_lcp_counter(loop_count);
record_delete_by_rowid(signal, frag.fragTableId, frag.fragmentId,
scan, pos.m_key_mm, foundGCI, true);
// TUPKEYREQ handles savepoint stuff
return false;
}
scan.m_last_seen = __LINE__;
break; // incr loop count
default:
ndbabort();
}
loop_count += 4;
if (loop_count >= 512) {
jam();
if (bits & ScanOp::SCAN_LCP) {
jam();
c_backup->update_pause_lcp_counter(loop_count);
if (!c_backup->check_pause_lcp()) {
loop_count = 0;
continue;
}
c_backup->pausing_lcp(5, loop_count);
}
break;
}
}
// TODO: at drop table we have to flush and terminate these
jam();
scan.m_last_seen = __LINE__;
signal->theData[0] = ZTUP_SCAN;
signal->theData[1] = scanPtr.i;
if (!c_lqh->rt_break_is_scan_prioritised(scan.m_userPtr)) {
jam();
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
} else {
/**
* Sending with bounded delay means that we allow all signals in job buffer
* to be executed until the maximum is arrived at which is currently 100.
* So sending with bounded delay means that we get more predictable delay.
* It might be longer than with priority B, but it will never be longer
* than 100 signals.
*/
jam();
// #ifdef VM_TRACE
c_debug_count++;
if (c_debug_count % 10000 == 0) {
DEB_LCP_DELAY(("(%u)TupScan delayed 10000 times", instance()));
}
// #endif
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, BOUNDED_DELAY, 2);
}
return false;
}