Status PersistentMemoryMalloc::AsyncFlushPages()

in src/core/persistent_memory_malloc.h [717:790]


Status PersistentMemoryMalloc<D>::AsyncFlushPages(uint32_t start_page, Address until_address,
    bool serialize_objects) {
  class Context : public IAsyncContext {
   public:
    Context(alloc_t* allocator_, uint32_t page_, Address until_address_)
      : allocator{ allocator_ }
      , page{ page_ }
      , until_address{ until_address_ } {
    }
    /// The deep-copy constructor
    Context(const Context& other)
      : allocator{ other.allocator }
      , page{ other.page }
      , until_address{ other.until_address } {
    }
   protected:
    Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
      return IAsyncContext::DeepCopy_Internal(*this, context_copy);
    }
   public:
    alloc_t* allocator;
    uint32_t page;
    Address until_address;
  };

  auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
    CallbackContext<Context> context{ ctxt };
    if(result != Status::Ok) {
      fprintf(stderr, "AsyncFlushPages(), error: %u\n", static_cast<uint8_t>(result));
    }
    context->allocator->PageStatus(context->page).LastFlushedUntilAddress.store(
      context->until_address);
    //Set the page status to flushed
    FlushCloseStatus old_status = context->allocator->PageStatus(context->page).status.load();
    FlushCloseStatus new_status;
    do {
      new_status = FlushCloseStatus{ FlushStatus::Flushed, old_status.close };
    } while(!context->allocator->PageStatus(context->page).status.compare_exchange_weak(
              old_status, new_status));
    if(old_status.close == CloseStatus::Closed) {
      // We finished flushing the page after it was closed, so we are responsible for clearing and
      // reopening it.
      std::memset(context->allocator->Page(context->page), 0, kPageSize);
      context->allocator->PageStatus(context->page).status.store(FlushStatus::Flushed,
          CloseStatus::Open);
    }
    context->allocator->ShiftFlushedUntilAddress();
  };

  uint32_t num_pages = until_address.page() - start_page;
  if(until_address.offset() > 0) {
    ++num_pages;
  }
  assert(num_pages > 0);

  for(uint32_t flush_page = start_page; flush_page < start_page + num_pages; ++flush_page) {
    Address page_start_address{ flush_page, 0 };
    Address page_end_address{ flush_page + 1, 0 };

    Context context{ this, flush_page, std::min(page_end_address, until_address) };

    //Set status to in-progress
    FlushCloseStatus old_status = PageStatus(flush_page).status.load();
    FlushCloseStatus new_status;
    do {
      new_status = FlushCloseStatus{ FlushStatus::InProgress, old_status.close };
    } while(!PageStatus(flush_page).status.compare_exchange_weak(old_status, new_status));
    PageStatus(flush_page).LastFlushedUntilAddress.store(0);

    RETURN_NOT_OK(file->WriteAsync(Page(flush_page), kPageSize * flush_page, kPageSize, callback,
                                   context));
  }
  return Status::Ok;
}