in turbonfs/src/fcsm.cpp [1092:1347]
void fcsm::on_flush_complete(uint64_t flush_bytes)
{
// Must be called only for success.
assert(inode->get_write_error() == 0);
assert(flush_bytes > 0);
// Must be called from flush/write callback.
assert(fc_cb_running());
// See below why we cannot assert this.
#if 0
// Flush callback can only be called if FCSM is running.
assert(is_running);
#endif
/*
* Commit will only be run after current flush completes.
* Since we are inside flush completion callback, commit cannot be
* running yet.
*/
assert(!inode->is_commit_in_progress());
// a byte can only be committed after it's flushed successfully.
assert(committing_seq_num <= flushed_seq_num);
assert(committed_seq_num <= committing_seq_num);
assert(committing_seq_num <= flushing_seq_num);
// Update flushed_seq_num to account for the newly flushed bytes.
flushed_seq_num += flush_bytes;
// flushed_seq_num can never go more than flushing_seq_num.
assert(flushed_seq_num <= flushing_seq_num);
AZLogDebug("[{}] [FCSM] on_flush_complete({}), Fd: {}, Fing: {}, "
"Cd: {}, Cing: {}, Fq: {}, Cq: {}, bytes_flushing: {}",
inode->get_fuse_ino(),
flush_bytes,
flushed_seq_num.load(),
flushing_seq_num.load(),
committed_seq_num.load(),
committing_seq_num.load(),
ftgtq.size(),
ctgtq.size(),
inode->get_filecache()->bytes_flushing.load());
/*
* If this is not the last completing flush (of the multiple parallel
* flushes that sync_membufs() may start), don't do anything.
* Only the last completing flush checks flush targets, as we cannot
* start a new flush or commit till the current flush completes fully.
*/
if (inode->get_filecache()->is_flushing_in_progress()) {
return;
}
inode->flush_lock();
/*
* Multiple libnfs (callback) threads can find is_flushing_in_progress()
* return false. The first one to get the flush_lock, gets to run the
* queued flush targets which includes completing the waiting tasks and/or
* trigger pending flush/commit. Other flush callback threads which get
* the lock after the first one, should simply return. They check for
* one of the following conditions to avoid duplicating work:
* 1. The first one didn't find anything to do, so it stopped the FSCM.
* 2. The first one triggered a flush target.
* 3. The first one triggered a commit target.
*/
if (inode->get_filecache()->is_flushing_in_progress() ||
inode->is_commit_in_progress() ||
!is_running()) {
assert(is_running() || ftgtq.empty());
inode->flush_unlock();
return;
}
/*
* Entire flush is done and no new flush can start, so flushed_seq_num must
* match flushing_seq_num.
*/
assert(flushed_seq_num == flushing_seq_num);
/*
* Go over all queued flush targets to see if any can be completed after
* the latest flush completed.
*/
while (!ftgtq.empty()) {
struct fctgt& tgt = ftgtq.front();
assert(tgt.fcsm == this);
/*
* ftgtq has flush targets in increasing order of flushed_seq_num, so
* as soon as we find one that's greater than flushed_seq_num, we can
* safely skip the rest.
*/
if (tgt.flush_seq > flushed_seq_num) {
break;
}
if (tgt.task) {
// Only one of task or done can be present.
assert(!tgt.done);
assert(tgt.task->magic == RPC_TASK_MAGIC);
assert(tgt.task->get_op_type() == FUSE_WRITE);
assert(tgt.task->rpc_api->write_task.is_fe());
assert(tgt.task->rpc_api->write_task.get_size() > 0);
AZLogDebug("[{}] [FCSM] completing blocking flush target: {}, "
"flushed_seq_num: {}, write task: [{}, {})",
inode->get_fuse_ino(),
tgt.flush_seq,
flushed_seq_num.load(),
tgt.task->rpc_api->write_task.get_offset(),
tgt.task->rpc_api->write_task.get_offset() +
tgt.task->rpc_api->write_task.get_size());
tgt.task->reply_write(
tgt.task->rpc_api->write_task.get_size());
} else if (tgt.done) {
AZLogDebug("[{}] [FCSM] completing blocking flush target: {}, "
"flushed_seq_num: {}",
inode->get_fuse_ino(),
tgt.flush_seq,
flushed_seq_num.load());
assert(*tgt.done == false);
*tgt.done = true;
} else {
AZLogDebug("[{}] [FCSM] completing non-blocking flush target: {}, "
"flushed_seq_num: {}",
inode->get_fuse_ino(),
tgt.flush_seq,
flushed_seq_num.load());
}
// Flush target accomplished, remove from queue.
ftgtq.pop();
}
/*
* We just completed a flush. See if we have some commit targets that we
* should trigger now. A commit target can only be triggered if we have
* flushed all bytes till the commit target.
* We check commit target before any other flush targets as committing
* helps us free memory.
*/
if (!ctgtq.empty() && (flushed_seq_num >= ctgtq.front().commit_seq)) {
assert(!inode->is_stable_write());
uint64_t bytes;
std::vector<bytes_chunk> bc_vec =
inode->get_filecache()->get_commit_pending_bcs(&bytes);
assert(bc_vec.empty() == (bytes == 0));
/*
* Since we have a commit target asking more data to be committed, we
* must have the corresponding bcs in the file cache.
*/
assert(!bc_vec.empty());
assert(bytes > 0);
/*
* commit_membufs() must increase committing_seq_num exactly by bytes,
* as all the bcs in bc_vec should be committed.
*/
[[maybe_unused]]
const uint64_t prev_committing_seq_num = committing_seq_num;
inode->commit_membufs(bc_vec);
assert(committing_seq_num == (prev_committing_seq_num + bytes));
} else if ((!ftgtq.empty() && (ftgtq.front().flush_seq > flushing_seq_num)) ||
(!ctgtq.empty() && (ctgtq.front().commit_seq > flushing_seq_num))) {
/*
* Nothing to commit, or what we want to commit has not yet flushed
* successfully. Do we want to flush more? We check two things:
* 1. Is there an explicit flush target which has not yet started
* flushing?
* 2. Is there an commit target which implies flushing?
*
* If the next flush or commit target has its flush issued, then we
* just have to wait for that flush to complete and then we will decide
* the next action, else issue it now.
*/
uint64_t bytes;
std::vector<bytes_chunk> bc_vec;
if (inode->is_stable_write()) {
bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(
0, UINT64_MAX,
&bytes);
} else {
bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes);
/*
* TODO: Handle the case where the ftgt wants us to do commit_full.
*/
}
/*
* Since we have a flush target asking more data to be flushed, we must
* have the corresponding bcs in the file cache.
*/
assert(!bc_vec.empty());
// We should flush all the dirty data in the chunkmap.
[[maybe_unused]]
const uint64_t next_goal =
std::max((ftgtq.empty() ? 0 : ftgtq.front().flush_seq),
(ctgtq.empty() ? 0 : ctgtq.front().commit_seq));
assert(bytes >= (next_goal - flushing_seq_num));
// flushed_seq_num can never be more than flushing_seq_num.
assert(flushed_seq_num <= flushing_seq_num);
AZLogDebug("[{}] [FCSM] continuing, flushing_seq_num now: {}, "
"flushed_seq_num: {}, bc_vec.size(): {}, FQ: {}, CQ: {}",
inode->get_fuse_ino(),
flushing_seq_num.load(),
flushed_seq_num.load(),
bc_vec.size(),
ftgtq.size(), ctgtq.size());
// sync_membufs() will update flushing_seq_num.
[[maybe_unused]]
const uint64_t prev_flushing_seq_num = flushing_seq_num;
inode->sync_membufs(bc_vec, false /* is_flush */);
assert(flushing_seq_num == (prev_flushing_seq_num + bytes));
} else if (ftgtq.empty() && ctgtq.empty()) {
/*
* No flush to issue, if we don't have any to wait for, then we can
* stop the state machine.
*/
AZLogDebug("[{}] [FCSM] idling, flushing_seq_num now: {}, "
"flushed_seq_num: {}",
inode->get_fuse_ino(),
flushing_seq_num.load(),
flushed_seq_num.load());
// FCSM should not idle when there's any ongoing flush.
assert(flushing_seq_num >= flushed_seq_num);
/*
* TODO: Modify flush_cache_and_wait() to also use the FCSM for
* performing the flush. Then we have any flush or commit
* only peformed by the state machine.
*/
assert(!inode->get_filecache()->is_flushing_in_progress());
assert(!inode->is_commit_in_progress());
clear_running();
} else {
AZLogCrit("Should not reach here");
assert(0);
}
inode->flush_unlock();
}