in turbonfs/src/fcsm.cpp [878:1082]
void fcsm::on_commit_complete(uint64_t commit_bytes)
{
// Commit must be called only for unstable writes.
assert(!inode->is_stable_write());
// Must be called only for success.
assert(inode->get_write_error() == 0);
assert(commit_bytes > 0);
// Must be called from flush/write callback.
assert(fc_cb_running());
// Commit callback can only be called if FCSM is running.
assert(is_running());
/*
* Commit callback can be called only when commit is in progress, clear
* it now. Must do it before grabbing the flush_lock, note that
* wait_for_ongoing_commit() is waiting for commit-in-progress to be
* cleared, with flush_lock held.
*/
assert(inode->is_commit_in_progress());
inode->clear_commit_in_progress();
// If commit is running, flush cannot be running.
assert(!inode->get_filecache()->is_flushing_in_progress());
// commit_pending_bytes must be 0 here.
assert(inode->get_filecache()->get_bytes_to_commit() == 0);
// a byte can only be committed after it's flushed successfully.
assert(committing_seq_num <= flushed_seq_num);
assert(committing_seq_num <= flushing_seq_num);
// Update committed_seq_num to account for the commit_bytes.
committed_seq_num += commit_bytes;
/*
* When a commit completes it commits everything that has been flushed
* till now also whatever has been scheduled for commit.
*/
assert(flushed_seq_num == committed_seq_num);
assert(committed_seq_num == committing_seq_num);
AZLogDebug("[{}] [FCSM] on_commit_complete({}), Fd: {}, Fing: {}, "
"Cd: {}, Cing: {}, Fq: {}, Cq: {}, bytes_flushing: {}",
inode->get_fuse_ino(),
commit_bytes,
flushed_seq_num.load(),
flushing_seq_num.load(),
committed_seq_num.load(),
committing_seq_num.load(),
ftgtq.size(),
ctgtq.size(),
inode->get_filecache()->bytes_flushing.load());
inode->flush_lock();
/*
* This can only come here with stable write true when
* switch_to_stable_write() was waiting for ongoing commits to complete
* and it went ahead and set inode stable write after we cleared the
* commit_in_progress above.
*/
assert(!inode->is_stable_write() || ctgtq.empty());
/*
* Go over all queued commit targets to see if any can be completed after
* the latest commit completed.
*/
while (!ctgtq.empty()) {
struct fctgt& tgt = ctgtq.front();
assert(tgt.fcsm == this);
/*
* ftgtq has commit targets in increasing order of committed_seq_num, so
* as soon as we find one that's greater than committed_seq_num, we can
* safely skip the rest.
*/
if (tgt.commit_seq > committed_seq_num) {
break;
}
if (tgt.task) {
// Only one of task or done can be present.
assert(!tgt.done);
assert(tgt.task->magic == RPC_TASK_MAGIC);
assert(tgt.task->get_op_type() == FUSE_WRITE);
assert(tgt.task->rpc_api->write_task.is_fe());
assert(tgt.task->rpc_api->write_task.get_size() > 0);
AZLogDebug("[{}] [FCSM] completing blocking commit target: {}, "
"committed_seq_num: {}, write task: [{}, {})",
inode->get_fuse_ino(),
tgt.commit_seq,
committed_seq_num.load(),
tgt.task->rpc_api->write_task.get_offset(),
tgt.task->rpc_api->write_task.get_offset() +
tgt.task->rpc_api->write_task.get_size());
tgt.task->reply_write(
tgt.task->rpc_api->write_task.get_size());
} else if (tgt.done) {
AZLogDebug("[{}] [FCSM] completing blocking commit target: {}, "
"committed_seq_num: {}",
inode->get_fuse_ino(),
tgt.commit_seq,
committed_seq_num.load());
assert(*tgt.done == false);
*tgt.done = true;
} else {
AZLogDebug("[{}] [FCSM] completing non-blocking commit target: {}, "
"committed_seq_num: {}",
inode->get_fuse_ino(),
tgt.commit_seq,
committed_seq_num.load());
}
// Commit target accomplished, remove from queue.
ctgtq.pop();
}
/*
* See if we have more commit targets and issue flush for the same.
*/
if (!ftgtq.empty() || !ctgtq.empty()) {
/*
* If we have any commit target here it must have commit_seq greater
* than committed_seq_num, else it would have been completed by the
* above loop.
* If we have any flush target it must have flush_seq greater than
* flushed_seq_num. This is because commit would have started after
* the flush and we would have completed all eligible flush targets.
*/
assert(ftgtq.empty() || ftgtq.front().flush_seq > flushed_seq_num);
assert(ctgtq.empty() || ctgtq.front().commit_seq > committed_seq_num);
uint64_t bytes;
std::vector<bytes_chunk> bc_vec;
/*
* This means we are here after switch_to_stable_write() switched to
* stable, we need to handle that.
*/
if (inode->is_stable_write()) {
assert(ctgtq.empty());
bc_vec =
inode->get_filecache()->get_dirty_nonflushing_bcs_range(
0, UINT64_MAX, &bytes);
// Here bc_vec can be empty, sync_membufs() can handle that.
} else {
bc_vec =
inode->get_filecache()->get_contiguous_dirty_bcs(&bytes);
/*
* Since we have a flush target asking more data to be flushed, we
* must have the corresponding bcs in the file cache.
*
* Note: We cannot have this assert for the stable write case, as
* sync_membufs() that called switch_to_stable_write(), might
* have consumed all these bcs and marked them flushing. When
* we come here we won't find any dirty-and-not-flushing bcs.
*/
assert(!bc_vec.empty());
assert(bytes > 0);
/*
* TODO: Handle the case where the ftgt wants us to do commit_full.
*/
}
// flushed_seq_num can never be more than flushing_seq_num.
assert(flushed_seq_num <= flushing_seq_num);
AZLogDebug("[{}] [FCSM] continuing, flushing_seq_num now: {}, "
"flushed_seq_num: {}",
inode->get_fuse_ino(),
flushing_seq_num.load(),
flushed_seq_num.load());
// sync_membufs() will update flushing_seq_num() and mark fcsm running.
[[maybe_unused]]
const uint64_t prev_flushing_seq_num = flushing_seq_num;
inode->sync_membufs(bc_vec, false /* is_flush */);
assert(flushing_seq_num == (prev_flushing_seq_num + bytes));
} else {
AZLogDebug("[{}] [FCSM] idling, flushed_seq_num now: {}, "
"committed_seq_num: {}",
inode->get_fuse_ino(),
flushed_seq_num.load(),
committed_seq_num.load());
// FCSM should not idle when there's any ongoing flush or commit.
assert(flushing_seq_num == flushed_seq_num);
assert(committing_seq_num == committed_seq_num);
assert(flushed_seq_num == committed_seq_num);
assert(!inode->get_filecache()->is_flushing_in_progress());
assert(!inode->is_commit_in_progress());
clear_running();
}
inode->flush_unlock();
}