in turbonfs/src/fcsm.cpp [413:653]
void fcsm::ensure_commit(uint64_t write_off,
uint64_t write_len,
struct rpc_task *task,
std::atomic<bool> *done,
bool commit_full)
{
assert(inode->is_flushing);
assert(!inode->is_stable_write());
/*
* Caller passes commit_full when they want all dirty data to be flushed
* and committed (o/w ensure_commit() can choose how much to flush/commit
* based on configured limits). In such case it will pass a pointer to an
* atomic bool 'done' which we should set to true once the flush/commit is
* done. Only one of task and done can be passed.
*/
assert(!commit_full || (task == nullptr));
assert(!commit_full || (done != nullptr));
assert((!!task + !!done) < 2);
assert(!done || (*done == false));
/*
* If any of the flush/commit targets are waiting completion, state machine
* must be running.
*/
assert(is_running() || (ctgtq.empty() && ftgtq.empty()));
assert(is_running() || (flushed_seq_num == flushing_seq_num));
assert(is_running() || (committed_seq_num == committing_seq_num));
AZLogDebug("[{}] [FCSM] ensure_commit<{}>"" write req [{}, {}], task: {}, "
"done: {}",
inode->get_fuse_ino(),
task ? "blocking" : "non-blocking",
commit_full ? " FULL" : "",
write_off, write_off + write_len,
fmt::ptr(task),
fmt::ptr(done));
// committed_seq_num can never be more than committing_seq_num.
assert(committed_seq_num <= committing_seq_num);
// we can only commit bytes which are flushed.
assert(committing_seq_num <= flushed_seq_num);
if (task) {
// task provided must be a frontend write task.
assert(task->magic == RPC_TASK_MAGIC);
assert(task->get_op_type() == FUSE_WRITE);
assert(task->rpc_api->write_task.is_fe());
assert(task->rpc_api->write_task.get_size() > 0);
}
/*
* Find how many bytes we would like to commit.
* If there are some commit-pending bytes we commit all of those, else
* we set a commit target large enough to flush+commit all leaving
* one full sized dirty extent.
*/
uint64_t commit_bytes =
inode->get_filecache()->get_bytes_to_commit();
/*
* Only known caller to pass commit_full as true is flush_cache_and_wait().
* It will first drain all commit_pending data by making a call to
* wait_for_ongoing_flush() before calling us, so we should not have any
* commit_pending data. Now we need to flush *all* dirty bytes and commit
* them and let the caller know once flush+commit is done.
*/
if (commit_full) {
assert(done && !*done);
assert(commit_bytes == 0);
commit_bytes = inode->get_filecache()->get_bytes_to_flush();
} else if (commit_bytes == 0) {
/*
* TODO: Make sure this doesn't result in small-blocks being written.
*/
const int64_t bytes =
(inode->get_filecache()->get_bytes_to_flush() -
inode->get_filecache()->max_dirty_extent_bytes());
commit_bytes = std::max(bytes, (int64_t) 0);
}
/*
* No new bytes to commit, complete the task if it was a blocking call.
*/
if (commit_bytes == 0) {
AZLogDebug("COMMIT BYTES ZERO");
if (task) {
assert(!done);
task->reply_write(task->rpc_api->write_task.get_size());
} else if (done) {
assert(*done == false);
*done = true;
}
return;
}
/*
* What will be the committed_seq_num value after commit_bytes are committed?
* Since commit_pending_bytes can reduce as another thread could be parallely
* running commit completion, so we may set target_commited_seq_num lower than
* the last queued commit_seq, so take the max.
*/
const uint64_t last_commit_seq =
!ctgtq.empty() ? ctgtq.front().commit_seq : 0;
const uint64_t target_committed_seq_num =
std::max((committed_seq_num + commit_bytes), last_commit_seq);
/*
* If the state machine is already running, we just need to add an
* appropriate commit target and return. When the ongoing operation
* completes, this commit would be dispatched.
* Make sure to convey commit_full correctly via the target.
*/
if (is_running()) {
#ifndef NDEBUG
/*
* Make sure commit targets are always added in an increasing commit_seq.
*/
if (!ctgtq.empty()) {
assert(ctgtq.front().commit_seq <= target_committed_seq_num);
assert(ctgtq.front().flush_seq == 0);
}
#endif
ctgtq.emplace(this,
0 /* target flush_seq */,
target_committed_seq_num /* target commit_seq */,
task,
done,
commit_full);
return;
}
/*
* FCSM not running.
* Flushed_seq_num tells us how much data is already flushed, If it's less
* than the target_committed_seq_num, we need to schedule a flush to catch up
* with the target_committed_seq_num.
*/
if (flushed_seq_num < target_committed_seq_num) {
AZLogDebug("[{}] [FCSM] not running, schedule a new flush to catch up, "
"flushed_seq_num: {}, target_committed_seq_num: {}, "
"stable: {}",
inode->get_fuse_ino(),
flushed_seq_num.load(),
target_committed_seq_num,
inode->is_stable_write());
/*
* ensure_flush()->sync_membufs() below may convert this inode to stable
* writes. In that case we should let caller know of completion once all
* dirty data is flushed, else we want to let caller know once all data
* is flushed and committed.
*
* commit_full amounts to flush_full_unstable.
*/
ensure_flush(task ? task->rpc_api->write_task.get_offset() : 0,
task ? task->rpc_api->write_task.get_size() : 0,
nullptr,
nullptr,
commit_full);
/*
* ensure_flush() flushes *all* dirty data, so it must have scheduled
* flushing till target_committed_seq_num.
*/
assert(flushing_seq_num >= target_committed_seq_num);
if (!inode->is_stable_write()) {
/**
* Enqueue a commit target to be triggered once the flush completes.
*/
ctgtq.emplace(this,
0 /* target flush_seq */,
target_committed_seq_num /* target commit_seq */,
task,
done);
} else {
/*
* Caller wanted to wait till commit completes, but now the inode
* has been converted to stable writes, there won't be any commits,
* complete the task.
*/
if (task) {
assert(!commit_full);
assert(!done);
task->reply_write(task->rpc_api->write_task.get_size());
} else if (done) {
assert(commit_full);
assert(*done == false);
ensure_flush(0, 0, nullptr, done);
}
}
return;
} else {
/*
* No new data to flush for the current commit goal, just add a commit.
* target and we are done.
* Since FCSM is not running and we discovered that we have one or more
* bytes to be committed, get_commit_pending_bcs() MUST return those.
*/
AZLogDebug("[{}] [FCSM] not running, schedule a new commit, "
"flushed_seq_num: {}, "
"target_committed_seq_num: {}",
inode->get_fuse_ino(),
flushed_seq_num.load(),
target_committed_seq_num);
uint64_t bytes;
std::vector<bytes_chunk> bc_vec =
inode->get_filecache()->get_commit_pending_bcs(&bytes);
assert(!bc_vec.empty());
assert(bytes > 0);
// With FCSM not running, these should be same.
assert(committing_seq_num == committed_seq_num);
[[maybe_unused]]
const uint64_t prev_committing_seq_num = committing_seq_num;
inode->commit_membufs(bc_vec);
assert(is_running());
assert(committing_seq_num == (prev_committing_seq_num + bytes));
assert(committing_seq_num > committed_seq_num);
/*
* Enqueue a commit target for caller to be notified when all data
* till target_committed_seq_num is flushed+committed. In case
* commit_full is true, above commit_membufs() may not be sufficient
* to commit all that data, but FCSM will ensure that all the requested
* data is flushed and committed.
*/
ctgtq.emplace(this,
0 /* target flush_seq */,
target_committed_seq_num /* target commit_seq */,
task,
done);
}
}