in turbonfs/src/fcsm.cpp [658:871]
void fcsm::ensure_flush(uint64_t write_off,
uint64_t write_len,
struct rpc_task *task,
std::atomic<bool> *done,
bool flush_full_unstable)
{
assert(inode->is_flushing);
/*
* Only one of task and done can be passed.
*/
assert((!!task + !!done) < 2);
assert(!done || (*done == false));
/*
* If any of the flush/commit targets are waiting completion, state machine
* must be running.
*/
assert(is_running() || (ctgtq.empty() && ftgtq.empty()));
assert(is_running() || (flushed_seq_num == flushing_seq_num));
assert(is_running() || (committed_seq_num == committing_seq_num));
#if 0
assert(!flush_full_unstable || !is_running());
#endif
AZLogDebug("[{}] [FCSM] ensure_flush<{}> write req [{}, {}], task: {}, "
"done: {}",
inode->get_fuse_ino(),
task ? "blocking" : "non-blocking",
write_off, write_off + write_len,
fmt::ptr(task),
fmt::ptr(done));
// flushed_seq_num can never be more than flushing_seq_num.
assert(flushed_seq_num <= flushing_seq_num);
if (task) {
// task provided must be a frontend write task.
assert(task->magic == RPC_TASK_MAGIC);
assert(task->get_op_type() == FUSE_WRITE);
assert(task->rpc_api->write_task.is_fe());
assert(task->rpc_api->write_task.get_size() > 0);
// write_len and write_off must match that of the task.
assert(task->rpc_api->write_task.get_size() == write_len);
assert(task->rpc_api->write_task.get_offset() == (off_t) write_off);
}
/*
* What will be the flushed_seq_num value after *all* current dirty bytes
* are flushed? That becomes our target flushed_seq_num.
* Since bytes_chunk_cache::{bytes_dirty,bytes_flushing} are not updated
* inside flush_lock, we can have race conditions where later values of
* target_flushed_seq_num may be less than what we have already queued in
* the latest flush target. In such case, just wait for the larger value.
*/
const uint64_t bytes_to_flush =
inode->get_filecache()->get_bytes_to_flush();
const uint64_t last_flush_seq =
!ftgtq.empty() ? ftgtq.front().flush_seq : 0;
const uint64_t target_flushed_seq_num =
std::max((flushing_seq_num + bytes_to_flush), last_flush_seq);
/*
* If the state machine is already running, we just need to add an
* appropriate flush target and return. When the ongoing operation
* completes, this flush would be dispatched.
*/
if (is_running()) {
#ifndef NDEBUG
/*
* Make sure flush targets are always added in an increasing flush_seq.
*/
if (!ftgtq.empty()) {
assert(ftgtq.front().flush_seq <= target_flushed_seq_num);
assert(ftgtq.front().commit_seq == 0);
}
#endif
#ifdef ENABLE_PARANOID
/*
* Since we are adding a flush target make sure we have that much dirty
* data in the chunkmap.
*/
{
uint64_t bytes;
std::vector<bytes_chunk> bc_vec =
inode->get_filecache()->get_dirty_nonflushing_bcs_range(
0, UINT64_MAX, &bytes);
assert(bc_vec.empty() == (bytes == 0));
assert(bytes >= bytes_to_flush);
for (auto& bc : bc_vec) {
bc.get_membuf()->clear_inuse();
}
}
#endif
/*
* If no new flush target and caller doesn't need to be notified,
* don't add a dup target. The already queued target will ensure
* the requested flush is done.
*/
if (!task && !done &&
(target_flushed_seq_num == last_flush_seq)) {
return;
}
ftgtq.emplace(this,
target_flushed_seq_num /* target flush_seq */,
0 /* commit_seq */,
task,
done,
flush_full_unstable);
return;
}
/*
* FCSM not running.
*/
assert(flushed_seq_num == flushing_seq_num);
assert(target_flushed_seq_num >= flushing_seq_num);
// No new data to flush.
if (target_flushed_seq_num == flushed_seq_num) {
if (task) {
assert(!done);
task->reply_write(task->rpc_api->write_task.get_size());
} else if (done) {
assert(*done == false);
*done = true;
}
return;
}
uint64_t bytes;
std::vector<bytes_chunk> bc_vec;
if (inode->is_stable_write()) {
bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(
0, UINT64_MAX, &bytes);
/*
* Dirty flushable data can increase after get_bytes_to_flush() call
* above as more dirty data can be added, while no dirty data can
* become flushing as we have the flush_lock.
*/
assert(bytes >= bytes_to_flush);
} else {
bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes);
/*
* If caller wants us to flush *all* dirty data, and we figured out that
* all of dirty data is not contiguous, then we need to switch to stable
* write and flush *all* dirty data.
*/
if (flush_full_unstable && bytes_to_flush > bytes) {
/*
* Release inuse count held by get_contiguous_dirty_bcs().
*/
for (bytes_chunk& bc : bc_vec) {
struct membuf *mb = bc.get_membuf();
mb->clear_inuse();
}
assert(!inode->is_commit_in_progress());
assert(!inode->get_filecache()->is_flushing_in_progress());
bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(
0, UINT64_MAX, &bytes);
assert(bytes >= bytes_to_flush);
inode->set_stable_write();
}
}
assert(bc_vec.empty() == (bytes == 0));
assert(bytes > 0);
/*
* Kickstart the state machine.
* Since we pass the 3rd arg to sync_membufs, it tells sync_membufs()
* to call the fuse callback after all the issued backend writes
* complete. This will be done asynchronously while the sync_membufs()
* call will return after issuing the writes.
*
* Note: sync_membufs() can free this rpc_task if all issued backend
* writes complete before sync_membufs() can return.
* DO NOT access rpc_task after sync_membufs() call.
*/
AZLogDebug("[{}] [FCSM] kicking, flushing_seq_num now: {} "
"flushed_seq_num: {}",
inode->get_fuse_ino(),
flushing_seq_num.load(),
flushed_seq_num.load());
[[maybe_unused]]
const uint64_t flushing_seq_num_before = flushing_seq_num;
assert(flushed_seq_num <= flushing_seq_num);
/*
* sync_membufs() will update flushing_seq_num and mark fcsm running.
* Task is not passed to sync_membufs, but enqueued to ftgtq.
*/
inode->sync_membufs(bc_vec, false /* is_flush */, nullptr);
assert(is_running());
assert(flushing_seq_num == (flushing_seq_num_before + bytes));
assert(flushed_seq_num <= flushing_seq_num);
/*
* Enqueue a flush target for caller to be notified when all data
* till target_flushed_seq_num is flushed.
*/
ftgtq.emplace(this,
target_flushed_seq_num /* target flush_seq */,
0 /* commit_seq */,
task,
done);
}