bool Dbtup::scanNext()

in storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp [1698:2598]


bool Dbtup::scanNext(Signal *signal, ScanOpPtr scanPtr) {
  ScanOp &scan = *scanPtr.p;
  ScanPos &pos = scan.m_scanPos;
  Local_key &key = pos.m_key;
  const Uint32 bits = scan.m_bits;
  // table
  TablerecPtr tablePtr;
  tablePtr.i = scan.m_tableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
  Tablerec &table = *tablePtr.p;
  m_curr_tabptr = tablePtr;
  // fragment
  FragrecordPtr fragPtr;
  fragPtr.i = scan.m_fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  Fragrecord &frag = *fragPtr.p;
  m_curr_fragptr = fragPtr;
  // tuple found
  Tuple_header *tuple_header_ptr = 0;
  Uint32 thbits = 0;
  Uint32 loop_count = 0;
  Uint32 foundGCI;

  const bool mm_index = (bits & ScanOp::SCAN_DD);
  const bool lcp = (bits & ScanOp::SCAN_LCP);

  const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0)
                          ? table.m_offsets[mm_index].m_fix_header_size
                          : 1;
  const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;

  if (lcp && !fragPtr.p->m_lcp_keep_list_head.isNull()) {
    jam();
    /**
     * Handle lcp keep list here too, due to scanCont
     */
    /* Coverage tested */
    ndbassert(!m_is_query_block);
    handle_lcp_keep(signal, fragPtr, scanPtr.p);
    scan.m_last_seen = __LINE__;
    return false;
  }

  switch (pos.m_get) {
    case ScanPos::Get_next_tuple:
      jam();
      key.m_page_idx += size;
      pos.m_get = ScanPos::Get_page;
      pos.m_realpid_mm = RNIL;
      break;
    case ScanPos::Get_tuple:
      jam();
      /**
       * We need to refetch page after timeslice
       */
      pos.m_get = ScanPos::Get_page;
      pos.m_realpid_mm = RNIL;
      break;
    default:
      break;
  }

  while (true) {
    switch (pos.m_get) {
      case ScanPos::Get_next_page:
        // move to next page
        jam();
        {
          if (!(bits & ScanOp::SCAN_DD))
            pos.m_get = ScanPos::Get_next_page_mm;
          else
            pos.m_get = ScanPos::Get_next_page_dd;
        }
        continue;
      case ScanPos::Get_page:
        // get real page
        jam();
        {
          if (!(bits & ScanOp::SCAN_DD))
            pos.m_get = ScanPos::Get_page_mm;
          else
            pos.m_get = ScanPos::Get_page_dd;
        }
        continue;
      case ScanPos::Get_next_page_mm:
        // move to next logical TUP page
        jam();
        {
          /**
           * Code for future activation, see  below for more details.
           * bool break_flag;
           * break_flag = false;
           */
          key.m_page_no++;
          if (likely(bits & ScanOp::SCAN_LCP)) {
            jam();
            /* Coverage tested path */
            /**
             * We could be scanning for a long time and only finding LCP_SKIP
             * records, we need to keep the LCP watchdog aware that we are
             * progressing, so we report each change to a new page by reporting
             * the id of the next page to scan.
             */
            c_backup->update_lcp_pages_scanned(
                signal, c_lqh->get_scan_api_op_ptr(scan.m_userPtr),
                key.m_page_no, scan.m_scanGCI,
                pos.m_lcp_scan_changed_rows_page);
            scan.m_last_seen = __LINE__;
          }
          if (unlikely(key.m_page_no >= frag.m_max_page_cnt)) {
            if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL)) {
              if (key.m_page_no < scan.m_endPage) {
                jam();
                DEB_NR_SCAN(("scanning page %u", key.m_page_no));
                goto cont;
              }
              jam();
              // no more pages, scan ends
              pos.m_get = ScanPos::Get_undef;
              scan.m_state = ScanOp::Last;
              return true;
            } else if (bits & ScanOp::SCAN_LCP &&
                       key.m_page_no < scan.m_endPage) {
              /**
               * We come here with ScanOp::SCAN_LCP set AND
               * frag.m_max_page_cnt < scan.m_endPage. In this case
               * it is still ok to finish the LCP scan. The missing
               * pages are handled when they are dropped, so before
               * we drop a page we record all entries that needs
               * recording for the LCP. These have been sent to the
               * LCP keep list. Since when we come here the LCP keep
               * list is empty we are done with the scan.
               *
               * We will however continue the scan for LCP scans. The
               * reason is that we might have set the LCP_SCANNED_BIT
               * on pages already dropped. So we need to continue scanning
               * to ensure that all the lcp scanned bits are reset.
               *
               * For the moment this code is unreachable since m_max_page_cnt
               * cannot decrease. Thus m_max_page_cnt cannot be smaller
               * than scan.m_endPage since scan.m_endPage is initialised to
               * m_max_page_cnt at start of scan.
               *
               * This is currently not implemented. So we
               * will make this code path using an ndbrequire instead.
               *
               * We keep the code as comments to be activated when we implement
               * the possibility to release pages in the directory.
               */
              ndbabort();
              /* We will not scan this page, so reset flag immediately */
              // reset_lcp_scanned_bit(fragPtr.p, key.m_page_no);
              // scan.m_last_seen = __LINE__;
              // break_flag = true;
            } else {
              // no more pages, scan ends
              pos.m_get = ScanPos::Get_undef;
              scan.m_last_seen = __LINE__;
              scan.m_state = ScanOp::Last;
              return true;
            }
          }
          if (unlikely((bits & ScanOp::SCAN_LCP) &&
                       (key.m_page_no >= scan.m_endPage))) {
            jam();
            /**
             * We have arrived at a page number that didn't exist at start of
             * LCP, we can quit the LCP scan since we cannot find any more
             * pages that are containing rows to be saved in LCP.
             */
            // no more pages, scan ends
            pos.m_get = ScanPos::Get_undef;
            scan.m_last_seen = __LINE__;
            scan.m_state = ScanOp::Last;
            return true;
          }
          /**
           * Activate this code if we implement support for decreasing
           * frag.m_max_page_cnt
           *
           * if (break_flag)
           * {
           * jam();
           * pos.m_get = ScanPos::Get_next_page_mm;
           * scan.m_last_seen = __LINE__;
           * break; // incr loop count
           * }
           */
        cont:
          key.m_page_idx = first;
          pos.m_get = ScanPos::Get_page_mm;
          // clear cached value
          pos.m_realpid_mm = RNIL;
        }
        [[fallthrough]];
      case ScanPos::Get_page_mm:
        // get TUP real page
        {
          PagePtr pagePtr;
          loop_count += 4;
          if (pos.m_realpid_mm == RNIL) {
            Uint32 *next_ptr, *prev_ptr;
            if (bits & ScanOp::SCAN_LCP) {
              jam();
              pos.m_realpid_mm = getRealpidScan(fragPtr.p, key.m_page_no,
                                                &next_ptr, &prev_ptr);
              Uint32 ret_val =
                  prepare_lcp_scan_page(scan, key, next_ptr, prev_ptr);
              if (ret_val == ZSCAN_FOUND_PAGE_END)
                break;
              else if (ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE)
                goto record_dropped_change_page;
              /* else continue */
            } else if (bits & ScanOp::SCAN_NR) {
              pos.m_realpid_mm = getRealpidScan(fragPtr.p, key.m_page_no,
                                                &next_ptr, &prev_ptr);
              if (unlikely(pos.m_realpid_mm == RNIL)) {
                jam();
                pagePtr.p = nullptr;
                goto nopage;
              }
            } else {
              /**
               * Ensure that we access the page map with protection from
               * the query thread, no need for this protection from LDM
               * thread.
               */
              acquire_frag_page_map_mutex_read(fragPtr.p);
              pos.m_realpid_mm = getRealpidCheck(fragPtr.p, key.m_page_no);
              release_frag_page_map_mutex_read(fragPtr.p);
              if (unlikely(pos.m_realpid_mm == RNIL)) {
                jam();
                pos.m_get = ScanPos::Get_next_page_mm;
                break;  // incr loop count
              }
              jam();
            }
          } else {
            jam();
          }
          ndbrequire(c_page_pool.getPtr(pagePtr, pos.m_realpid_mm));
          /**
           * We are in the process of performing a Full table scan, this can be
           * either due to a user requesting a full table scan, it can also be
           * as part of Node Recovery where we are assisting the starting node
           * to be synchronized (SCAN_NR set) and it is also used for LCP scans
           * (SCAN_LCP set).
           *
           * We know that we will touch all cache lines where there is a tuple
           * header and all scans using main memory pages are done on the fixed
           * pages. To speed up scan processing we will prefetch such that we
           * always are a few tuples ahead. We scan ahead 4 tuples here and then
           * we scan yet one more ahead at each new tuple we get to. We only
           * need initialise by scanning 3 rows ahead since we will immediately
           * fetch the fourth one before looking at the first row.
           *
           * PREFETCH_SCAN_TUPLE:
           */
          if (likely((key.m_page_idx + (size * 3)) <= Fix_page::DATA_WORDS)) {
            struct Tup_fixsize_page *page_ptr =
                (struct Tup_fixsize_page *)pagePtr.p;
            NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx, size));
            NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + size, size));
            NDB_PREFETCH_READ(
                page_ptr->get_ptr(key.m_page_idx + (size * 2), size));
          }
          if (bits & ScanOp::SCAN_LCP) {
            if (pagePtr.p->is_page_to_skip_lcp()) {
              Uint32 ret_val = handle_lcp_skip_page(scan, key, pagePtr.p);
              if (ret_val == ZSCAN_FOUND_PAGE_END) {
                jamDebug();
                break;
              } else {
                jamDebug();
                ndbrequire(ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE);
                goto record_dropped_change_page;
              }
            } else if (pos.m_lcp_scan_changed_rows_page) {
              /* CHANGE page is accessed */
              if (key.m_page_idx == 0) {
                jamDebug();
                /* First access of a CHANGE page */
                Uint32 ret_val = setup_change_page_for_scan(
                    scan, (Fix_page *)pagePtr.p, key, size);
                if (ret_val == ZSCAN_FOUND_PAGE_END) {
                  jamDebug();
                  /* No changes found on page level bitmaps */
                  break;
                } else {
                  ndbrequire(ret_val == ZSCAN_FOUND_TUPLE);
                }
              }
            } else {
              /* LCP ALL page is accessed */
              jamDebug();
              /**
               * Make sure those values have defined values if we were to enter
               * the wrong path for some reason. These values will lead to a
               * crash if we try to run the CHANGE page code for an ALL page.
               */
              pos.m_all_rows = false;
              pos.m_next_small_area_check_idx = RNIL;
              pos.m_next_large_area_check_idx = RNIL;
            }
          }
          /* LCP normal case 4a) above goes here */

        nopage:
          pos.m_page = pagePtr.p;
          pos.m_get = ScanPos::Get_tuple;
        }
        continue;
      case ScanPos::Get_next_page_dd:
        // move to next disk page
        jam();
        {
          Disk_alloc_info &alloc = frag.m_disk_alloc_info;
          Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
          Ptr<Extent_info> ext_ptr;
          ndbrequire(c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i));
          Extent_info *ext = ext_ptr.p;
          key.m_page_no++;
          if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
            // no more pages in this extent
            jam();
            if (!list.next(ext_ptr)) {
              // no more extents, scan ends
              jam();
              pos.m_get = ScanPos::Get_undef;
              scan.m_state = ScanOp::Last;
              return true;
            } else {
              // move to next extent
              jam();
              pos.m_extent_info_ptr_i = ext_ptr.i;
              ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
              key.m_file_no = ext->m_key.m_file_no;
              key.m_page_no = ext->m_first_page_no;
            }
          }
          key.m_page_idx = first;
          pos.m_get = ScanPos::Get_page_dd;
          /*
            read ahead for scan in disk order
            do read ahead every 8:th page
          */
          if ((bits & ScanOp::SCAN_DD) &&
              (((key.m_page_no - ext->m_first_page_no) & 7) == 0)) {
            jam();
            // initialize PGMAN request
            Page_cache_client::Request preq;
            preq.m_page = pos.m_key;
            preq.m_callback = TheNULLCallback;

            // set maximum read ahead
            Uint32 read_ahead = m_max_page_read_ahead;

            while (true) {
              // prepare page read ahead in current extent
              Uint32 page_no = preq.m_page.m_page_no;
              Uint32 page_no_limit = page_no + read_ahead;
              Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
              if (page_no_limit > limit) {
                jam();
                // read ahead crosses extent, set limit for this extent
                read_ahead = page_no_limit - limit;
                page_no_limit = limit;
                // and make sure we only read one extra extent next time around
                if (read_ahead > alloc.m_extent_size)
                  read_ahead = alloc.m_extent_size;
              } else {
                jam();
                read_ahead = 0;  // no more to read ahead after this
              }
              // do read ahead pages for this extent
              while (page_no < page_no_limit) {
                // page request to PGMAN
                jam();
                preq.m_page.m_page_no = page_no;
                preq.m_table_id = frag.fragTableId;
                preq.m_fragment_id = frag.fragmentId;
                int flags = Page_cache_client::DISK_SCAN;
                // ignore result
                Page_cache_client pgman(this, c_pgman);
                pgman.get_page(signal, preq, flags);
                jamEntry();
                page_no++;
              }
              if (!read_ahead || !list.next(ext_ptr)) {
                // no more extents after this or read ahead done
                jam();
                break;
              }
              // move to next extent and initialize PGMAN request accordingly
              Extent_info *ext = c_extent_pool.getPtr(ext_ptr.i);
              preq.m_page.m_file_no = ext->m_key.m_file_no;
              preq.m_page.m_page_no = ext->m_first_page_no;
            }
          }  // if ScanOp::SCAN_DD read ahead
        }
        [[fallthrough]];
      case ScanPos::Get_page_dd:
        // get global page in PGMAN cache
        jam();
        {
          // check if page is un-allocated or empty
          if (likely(!(bits & ScanOp::SCAN_NR))) {
            D("Tablespace_client - scanNext");
            Tablespace_client tsman(
                signal, this, c_tsman, frag.fragTableId, frag.fragmentId,
                c_lqh->getCreateSchemaVersion(frag.fragTableId),
                frag.m_tablespace_id);
            unsigned uncommitted, committed;
            uncommitted = committed = ~(unsigned)0;
            int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
            ndbrequire(ret == 0);
            if (committed == 0 && uncommitted == 0) {
              // skip empty page
              jam();
              pos.m_get = ScanPos::Get_next_page_dd;
              break;  // incr loop count
            }
          }
          // page request to PGMAN
          Page_cache_client::Request preq;
          preq.m_page = pos.m_key;
          preq.m_table_id = frag.fragTableId;
          preq.m_fragment_id = frag.fragmentId;
          preq.m_callback.m_callbackData = scanPtr.i;
          preq.m_callback.m_callbackFunction =
              safe_cast(&Dbtup::disk_page_tup_scan_callback);
          int flags = Page_cache_client::DISK_SCAN;
          Page_cache_client pgman(this, c_pgman);
          Ptr<GlobalPage> pagePtr;
          int res = pgman.get_page(signal, preq, flags);
          pagePtr = pgman.m_ptr;
          jamEntry();
          if (res == 0) {
            jam();
            // request queued
            pos.m_get = ScanPos::Get_tuple;
            return false;
          } else if (res < 0) {
            jam();
            if (res == -1) {
              jam();
              m_scan_error_code = Uint32(~0);
            } else {
              jam();
              res = -res;
              m_scan_error_code = res;
            }
            /* Flag to reply code that we have an error */
            scan.m_state = ScanOp::Invalid;
            return true;
          }
          ndbrequire(res > 0);
          pos.m_page = (Page *)pagePtr.p;
        }
        pos.m_get = ScanPos::Get_tuple;
        continue;
        // get tuple
        // move to next tuple
      case ScanPos::Get_next_tuple:
        // move to next fixed size tuple
        jam();
        {
          key.m_page_idx += size;
          pos.m_get = ScanPos::Get_tuple;
        }
        [[fallthrough]];
      case ScanPos::Get_tuple:
        // get fixed size tuple
        jam();
        if ((bits & ScanOp::SCAN_VS) == 0) {
          Fix_page *page = (Fix_page *)pos.m_page;
          if (key.m_page_idx + size <= Fix_page::DATA_WORDS) {
            pos.m_get = ScanPos::Get_next_tuple;
            if (unlikely((bits & ScanOp::SCAN_NR) &&
                         pos.m_realpid_mm == RNIL)) {
              /**
               * pos.m_page isn't initialized this path, so handle early
               * We're doing a node restart and we are scanning beyond our
               * existing rowid's since starting node had those rowid's
               * defined.
               */
              jam();
              foundGCI = 0;
              goto found_deleted_rowid;
            }
#ifdef VM_TRACE
            if (!(bits & ScanOp::SCAN_DD)) {
              acquire_frag_page_map_mutex_read(fragPtr.p);
              Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
              release_frag_page_map_mutex_read(fragPtr.p);
              ndbrequire(pos.m_realpid_mm == realpid);
            }
#endif
            tuple_header_ptr = (Tuple_header *)&page->m_data[key.m_page_idx];

            if ((key.m_page_idx + (size * 4)) <= Fix_page::DATA_WORDS) {
              /**
               * Continue staying ahead of scan on this page by prefetching
               * a row 4 tuples ahead of this tuple, prefetched the first 3
               * at PREFETCH_SCAN_TUPLE.
               */
              struct Tup_fixsize_page *page_ptr =
                  (struct Tup_fixsize_page *)page;
              NDB_PREFETCH_READ(
                  page_ptr->get_ptr(key.m_page_idx + (size * 3), size));
            }
            if (likely((!((bits & ScanOp::SCAN_NR) ||
                          (bits & ScanOp::SCAN_LCP))) ||
                       ((bits & ScanOp::SCAN_LCP) &&
                        !pos.m_lcp_scan_changed_rows_page))) {
              jam();
              /**
               * We come here for normal full table scans and also for LCP
               * scans where we scan ALL ROWS pages.
               *
               * We simply check if the row is free, if it isn't then we will
               * handle it. For LCP scans we will also check at found_tuple that
               * the LCP_SKIP bit isn't set. If it is then the rowid was empty
               * at start of LCP. If the rowid is free AND we are scanning an
               * ALL ROWS page then the LCP_SKIP cannot be set, this is set only
               * for CHANGED ROWS pages when deleting tuples.
               *
               * Free rowid's might have existed at start of LCP. This was
               * handled by using the LCP keep list when tuple was deleted.
               * So when we come here we don't have to worry about LCP scanning
               * those rows.
               *
               * LCP_DELETE flag can never be set on ALL ROWS pages.
               *
               * The state Tuple_header::ALLOC means that the row is being
               * inserted, it thus have no current committed state and is
               * thus here equivalent to the FREE state for LCP scans.
               *
               * We need to acquire the TUP fragment mutex before reading the
               * tuple header bits. The reason for this is to ensure that
               * we don't interact with INSERT operations that will
               * manipulate the header bits during allocation of a new row.
               *
               * If someone is inserting a row in this very position we will
               * hold the mutex and thus acquiring the mutex here for query
               * threads ensure that they don't read a row in the middle of
               * its insertion process.
               */
              acquire_frag_mutex_read(fragPtr.p, key.m_page_no);
              thbits = tuple_header_ptr->m_header_bits;
              release_frag_mutex_read(fragPtr.p, key.m_page_no);
              if ((bits & ScanOp::SCAN_LCP) &&
                  (thbits & Tuple_header::LCP_DELETE)) {
                g_eventLogger->info(
                    "(%u)LCP_DELETE on tab(%u,%u), row(%u,%u)"
                    " ALL ROWS page, header: %x",
                    instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
                    key.m_page_no, key.m_page_idx, thbits);
                ndbabort();
              }
              if (!((thbits & Tuple_header::FREE ||
                     thbits & Tuple_header::DELETE_WAIT) ||
                    ((bits & ScanOp::SCAN_LCP) &&
                     (thbits & Tuple_header::ALLOC)))) {
                jam();
                scan.m_last_seen = __LINE__;
                goto found_tuple;
              }
              /**
               * Ensure that LCP_SKIP bit is clear before we move on
               * It could be set if the row was inserted after LCP
               * start and then followed by a delete of the row before
               * we arrive here.
               */
              if ((bits & ScanOp::SCAN_LCP) &&
                  (thbits & Tuple_header::LCP_SKIP)) {
                jam();
                acquire_frag_mutex(fragPtr.p, key.m_page_no);
                tuple_header_ptr->m_header_bits =
                    thbits & (~Tuple_header::LCP_SKIP);
                DEB_LCP_SKIP(
                    ("(%u)Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
                     ", header: %x"
                     ", new header: %x"
                     ", tuple_header_ptr: %p",
                     instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
                     key.m_page_no, key.m_page_idx, thbits,
                     tuple_header_ptr->m_header_bits, tuple_header_ptr));
                updateChecksum(tuple_header_ptr, tablePtr.p, thbits,
                               tuple_header_ptr->m_header_bits);
                release_frag_mutex(fragPtr.p, key.m_page_no);
              }
              scan.m_last_seen = __LINE__;
            } else if (bits & ScanOp::SCAN_NR) {
              thbits = tuple_header_ptr->m_header_bits;
              if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
                      scan.m_scanGCI ||
                  foundGCI == 0) {
                /**
                 * foundGCI == 0 means that the row is initialised but has not
                 * yet been committed as part of insert transaction. All other
                 * rows have the GCI entry set to last GCI it was changed, this
                 * is true for even deleted rows as long as the page is still
                 * maintained by the fragment.
                 *
                 * When foundGCI == 0 there are two cases.
                 * The first case is that thbits == Fix_page::FREE_RECORD.
                 * In this case the tuple doesn't exist and should be
                 * deleted if existing in the starting node.
                 * As part of Fix_page::FREE_RECORD the Tuple_header::FREE
                 * bit is set. So this is handled below.
                 * The second case is that thbits == Tuple_header::ALLOC.
                 * In this case the tuple is currently being inserted, but the
                 * transaction isn't yet committed. In this case we will follow
                 * the found_tuple path. This means that we will attempt to
                 * lock the tuple, this will be unsuccessful since the row
                 * is currently being inserted and is locked for write.
                 * When the commit happens the row lock is released and the
                 * copy scan will continue on this row. It will send an INSERT
                 * to the starting node. Most likely the INSERT transaction
                 * was started after the copy scan started, in this case the
                 * INSERT will simply be converted to an UPDATE by the starting
                 * node. If the insert was started before the new replica of
                 * the fragment was included, the INSERT will be performed.
                 * This is the reason why we have to go the extra mile here to
                 * ensure that we don't lose records that are being inserted as
                 * part of long transactions.
                 *
                 * The final problem is when the INSERT is aborted. In this case
                 * we return from the lock row in execACCKEYREF. Since the row
                 * is now in the Tuple_header::FREE state we must re-read the
                 * row again. This is handled by changing the pos.m_get state
                 * to Get_tuple instead of Get_next_tuple.
                 */
                if (!(thbits & Tuple_header::FREE ||
                      thbits & Tuple_header::DELETE_WAIT)) {
                  jam();
                  goto found_tuple;
                } else {
                  goto found_deleted_rowid;
                }
              } else if ((thbits & Fix_page::FREE_RECORD) !=
                             Fix_page::FREE_RECORD &&
                         tuple_header_ptr->m_operation_ptr_i != RNIL) {
                jam();
                goto found_tuple;  // Locked tuple...
                // skip free tuple
              }
              DEB_NR_SCAN_EXTRA(
                  ("(%u)NR_SCAN_SKIP:tab(%u,%u) row(%u,%u),"
                   " recGCI: %u, scanGCI: %u, header: %x",
                   instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
                   key.m_page_no, key.m_page_idx, foundGCI, scan.m_scanGCI,
                   thbits));
            } else {
              ndbrequire(c_backup->is_partial_lcp_enabled());
              ndbrequire((bits & ScanOp::SCAN_LCP) &&
                         pos.m_lcp_scan_changed_rows_page);
              Uint32 ret_val;
              if (!pos.m_all_rows) {
                ret_val = move_to_next_change_page_row(
                    scan, page, &tuple_header_ptr, loop_count, size);
                if (ret_val == ZSCAN_FOUND_PAGE_END) {
                  /**
                   * We have finished scanning a CHANGE PAGE row where we
                   * checked even the parts of a page. In this case we
                   * perform very detailed analysis that we clear all bits
                   * while scanning. To handle this we will set a special
                   * bit if anyone updates any row in the page while
                   * we are scanning in this mode. This ensures that the
                   * flag bits are in read-only mode and only updated by
                   * LCP scanning. We don't track which part of page is
                   * updated in this case, so if any updates have been
                   * performed on page in this state, all bits on page
                   * are set to ensure that we will scan the entire page
                   * in the next LCP scan.
                   */
                  ndbassert(!page->get_any_changes());
                  page->clear_page_being_lcp_scanned();
                  if (page->get_and_clear_change_while_lcp_scan()) {
                    jamDebug();
                    page->set_all_change_map();
                  }
                  /**
                   * We've finished scanning a page that was using filtering
                   * using the bitmaps on the page. We are ready to set the last
                   * LCP state to A.
                   */
                  /* Coverage tested */
                  set_last_lcp_state(fragPtr.p, key.m_page_no,
                                     false /* Set state to A */);
                  scan.m_last_seen = __LINE__;
                  pos.m_get = ScanPos::Get_next_page;
                  break;
                }
              }
              ret_val = handle_scan_change_page_rows(
                  scan, page, tuple_header_ptr, foundGCI, fragPtr.p);
              if (likely(ret_val == ZSCAN_FOUND_TUPLE)) {
                thbits = tuple_header_ptr->m_header_bits;
                goto found_tuple;
              } else if (ret_val == ZSCAN_FOUND_DELETED_ROWID)
                goto found_deleted_rowid;
              ndbrequire(ret_val == ZSCAN_FOUND_NEXT_ROW);
            }
          } else {
            jam();
            /**
             * We've finished scanning a page, for LCPs we are ready to
             * set the last LCP state to A.
             */
            if (bits & ScanOp::SCAN_LCP) {
              jam();
              /* Coverage tested */
              set_last_lcp_state(fragPtr.p, key.m_page_no,
                                 false /* Set state to A */);
              if (!pos.m_all_rows) {
                ndbassert(page->verify_change_maps(jamBuffer()));
              }
              scan.m_last_seen = __LINE__;
            }
            // no more tuples on this page
            pos.m_get = ScanPos::Get_next_page;
          }
        } else {
          jam();
          Var_page *page = (Var_page *)pos.m_page;
          if (key.m_page_idx < page->high_index) {
            jam();
            pos.m_get = ScanPos::Get_next_tuple;
            if (!page->is_free(key.m_page_idx)) {
              tuple_header_ptr = (Tuple_header *)page->get_ptr(key.m_page_idx);
              thbits = tuple_header_ptr->m_header_bits;
              goto found_tuple;
            }
          } else {
            jam();
            // no more tuples on this page
            pos.m_get = ScanPos::Get_next_page;
            break;
          }
        }
        break;  // incr loop count
      found_tuple:
        // found possible tuple to return
        jam();
        {
          // caller has already set pos.m_get to next tuple
          if (likely(!(bits & ScanOp::SCAN_LCP &&
                       thbits & Tuple_header::LCP_SKIP))) {
            Local_key &key_mm = pos.m_key_mm;
            if (likely(!(bits & ScanOp::SCAN_DD))) {
              key_mm = pos.m_key;
              // real page id is already set
              if (bits & ScanOp::SCAN_LCP) {
                c_backup->update_pause_lcp_counter(loop_count);
              }
            } else {
              /**
               * Disk data rows are only accessed in LDM thread.
               * Thus no need to acquire mutex for access here.
               */
              ndbrequire(!m_is_in_query_thread);
              tuple_header_ptr->get_base_record_ref(key_mm);
              // recompute for each disk tuple
              pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
            }
            // TUPKEYREQ handles savepoint stuff
            scan.m_state = ScanOp::Current;
            return true;
          } else {
            jam();
            /* Clear LCP_SKIP bit so that it will not show up in next LCP */
            /**
             * We need to use a mutex since otherwise readers could calculate
             * the wrong checksum.
             */
            acquire_frag_mutex(fragPtr.p, key.m_page_no);
            tuple_header_ptr->m_header_bits =
                thbits & ~(Uint32)Tuple_header::LCP_SKIP;

            DEB_LCP_SKIP(
                ("(%u) 3 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
                 ", header: %x",
                 instance(), fragPtr.p->fragTableId, fragPtr.p->fragmentId,
                 key.m_page_no, key.m_page_idx, thbits));

            updateChecksum(tuple_header_ptr, tablePtr.p, thbits,
                           tuple_header_ptr->m_header_bits);
            release_frag_mutex(fragPtr.p, key.m_page_no);
            scan.m_last_seen = __LINE__;
          }
        }
        break;

      record_dropped_change_page : {
        ndbrequire(c_backup->is_partial_lcp_enabled());
        c_backup->update_pause_lcp_counter(loop_count);
        record_delete_by_pageid(signal, frag.fragTableId, frag.fragmentId, scan,
                                key.m_page_no, size, true);
        return false;
      }

      found_deleted_rowid:

        ndbrequire((bits & ScanOp::SCAN_NR) || (bits & ScanOp::SCAN_LCP));
        if (!(bits & ScanOp::SCAN_LCP && pos.m_is_last_lcp_state_D)) {
          ndbrequire(bits & ScanOp::SCAN_NR ||
                     pos.m_lcp_scan_changed_rows_page);

          Local_key &key_mm = pos.m_key_mm;
          if (!(bits & ScanOp::SCAN_DD)) {
            jam();
            key_mm = pos.m_key;
            // caller has already set pos.m_get to next tuple
            // real page id is already set
          } else {
            jam();
            /**
             * Currently dead code since NR scans never use Disk data scans.
             */
            ndbrequire(bits & ScanOp::SCAN_NR);
            tuple_header_ptr->get_base_record_ref(key_mm);
            // recompute for each disk tuple
            pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);

            Fix_page *mmpage = (Fix_page *)c_page_pool.getPtr(pos.m_realpid_mm);
            tuple_header_ptr =
                (Tuple_header *)(mmpage->m_data + key_mm.m_page_idx);
            if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
                    scan.m_scanGCI ||
                foundGCI == 0) {
              thbits = tuple_header_ptr->m_header_bits;
              if (!(thbits & Tuple_header::FREE ||
                    thbits & Tuple_header::DELETE_WAIT)) {
                jam();
                break;
              }
              jam();
            }
          }
          /**
           * This code handles Node recovery, the row might still exist at the
           * starting node although it no longer exists at this live node. We
           * send a DELETE by ROWID to the starting node.
           *
           * This code is also used by LCPs to record deleted row ids.
           */
          c_backup->update_pause_lcp_counter(loop_count);
          record_delete_by_rowid(signal, frag.fragTableId, frag.fragmentId,
                                 scan, pos.m_key_mm, foundGCI, true);
          // TUPKEYREQ handles savepoint stuff
          return false;
        }
        scan.m_last_seen = __LINE__;
        break;  // incr loop count
      default:
        ndbabort();
    }
    loop_count += 4;
    if (loop_count >= 512) {
      jam();
      if (bits & ScanOp::SCAN_LCP) {
        jam();
        c_backup->update_pause_lcp_counter(loop_count);
        if (!c_backup->check_pause_lcp()) {
          loop_count = 0;
          continue;
        }
        c_backup->pausing_lcp(5, loop_count);
      }
      break;
    }
  }
  // TODO: at drop table we have to flush and terminate these
  jam();
  scan.m_last_seen = __LINE__;
  signal->theData[0] = ZTUP_SCAN;
  signal->theData[1] = scanPtr.i;
  if (!c_lqh->rt_break_is_scan_prioritised(scan.m_userPtr)) {
    jam();
    sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
  } else {
    /**
     * Sending with bounded delay means that we allow all signals in job buffer
     * to be executed until the maximum is arrived at which is currently 100.
     * So sending with bounded delay means that we get more predictable delay.
     * It might be longer than with priority B, but it will never be longer
     * than 100 signals.
     */
    jam();
    // #ifdef VM_TRACE
    c_debug_count++;
    if (c_debug_count % 10000 == 0) {
      DEB_LCP_DELAY(("(%u)TupScan delayed 10000 times", instance()));
    }
    // #endif
    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, BOUNDED_DELAY, 2);
  }
  return false;
}