in turbonfs/src/nfs_inode.cpp [947:1131]
int nfs_inode::copy_to_cache(const struct fuse_bufvec* bufv,
off_t offset,
uint64_t *extent_left,
uint64_t *extent_right)
{
/*
* XXX We currently only handle bufv with count=1.
* Ref aznfsc_ll_write_buf().
*/
assert(bufv->count == 1);
/*
* copy_to_cache() must be called only for a regular file and it must have
* filecache initialized.
*/
assert(is_regfile());
assert(has_filecache());
assert(offset < (off_t) AZNFSC_MAX_FILE_SIZE);
assert(bufv->idx < bufv->count);
const size_t length = bufv->buf[bufv->idx].size - bufv->off;
assert((int) length >= 0);
assert((offset + length) <= AZNFSC_MAX_FILE_SIZE);
/*
* TODO: Investigate using splice for zero copy.
*/
const char *buf = (char *) bufv->buf[bufv->idx].mem + bufv->off;
int err = 0;
bool inject_eagain = false;
/*
* Get bytes_chunk(s) covering the range [offset, offset+length).
* We need to copy application data to those.
*/
std::vector<bytes_chunk> bc_vec =
filecache_handle->getx(offset, length, extent_left, extent_right);
size_t remaining = length;
for (auto& bc : bc_vec) {
struct membuf *mb = bc.get_membuf();
#ifdef ENABLE_PARANOID
bool found_not_uptodate = false;
if (!err && inject_error()) {
err = EAGAIN;
AZLogWarn("[{}] PP: copy_to_cache(): injecting EAGAIN for membuf "
"[{}, {}) (bc [{}, {})), length={}, remaining={}",
ino, mb->offset.load(), mb->offset.load()+mb->length.load(),
bc.offset, bc.offset+bc.length,
length, remaining);
}
#endif
/*
* If we have already failed with EAGAIN, just drain the bc_vec
* clearing the inuse count for all the bytes_chunk.
*
* TODO: If we have copied at least one byte, do not fail but instead
* let the caller know that we copied ledd.
*/
if (err == EAGAIN) {
mb->clear_inuse();
assert(remaining >= bc.length);
remaining -= bc.length;
continue;
}
/*
* Lock the membuf while we copy application data into it.
*/
mb->set_locked();
/*
* If we own the full membuf we can safely copy to it, also if the
* membuf is uptodate we can safely copy to it. In both cases the
* membuf remains uptodate after the copy.
*
* TODO: We need to handle the case where application writes on
* the file range in commit_pending state.
*/
try_copy:
if ((bc.maps_full_membuf() || mb->is_uptodate()) &&
!mb->is_commit_pending()) {
assert(bc.length <= remaining);
::memcpy(bc.get_buffer(), buf, bc.length);
mb->set_uptodate();
mb->set_dirty();
// Update file size in inode'c cached attr.
on_cached_write(bc.offset, bc.length);
} else {
#ifdef ENABLE_PARANOID
/*
* Once we find the membuf uptodate, after waiting, and run
* try_copy again, we must not find the membuf not-uptodate
* again.
*/
assert(!found_not_uptodate);
found_not_uptodate = true;
#endif
/*
* bc refers to part of the membuf and membuf is not uptodate.
* This can happen if our bytes_chunk_cache::get() call raced with
* some other thread and they requested a bigger bytes_chunk than
* us. The original bytes_chunk was allocated per their request
* and our request was smaller one that fitted completely within
* their request and and hence we were given the same membuf,
* albeit a smaller bytes_chunk. Now both the threads would next
* try to lock the membuf to perform their corresponding IO, this
* time we won the race and hence when we look at the membuf it's
* a partial one and not uptodate. Since membuf is not uptodate
* we will need to do a read-modify-write operation to correctly
* update part of the membuf. Since we know that some other thread
* is waiting to perform IO on the entire membuf, we simply let
* that thread proceed with its IO. Once it's done the membuf will
* be uptodate and then we can perform the simple copy.
* We wait for 50 msecs after releasing the lock to let the other
* thread get the lock. Once it gets the lock it'll only release
* it after it performs the IO. So, after we reacquire the lock
* if the membuf is not uptodate it implies that the other thread
* wasn't able to mark the membuf uptodate. In this case we need
* to get fresh bytes_chunk vector and re-do the copy.
*/
const uint64_t rand_ms = random_number(1, 50);
AZLogWarn("[{}] Waiting for membuf [{}, {}) (bc [{}, {})) to "
"become uptodate, dropping lock for {} msecs", ino,
mb->offset.load(), mb->offset.load()+mb->length.load(),
bc.offset, bc.offset+bc.length, rand_ms);
mb->clear_locked();
::usleep(rand_ms * 1000);
mb->set_locked();
#ifdef ENABLE_PARANOID
inject_eagain = inject_error();
#endif
if (mb->is_uptodate() && !inject_eagain) {
AZLogWarn("[{}] Membuf [{}, {}) (bc [{}, {})) is now uptodate, "
"retrying copy", ino,
mb->offset.load(), mb->offset.load()+mb->length.load(),
bc.offset, bc.offset+bc.length);
goto try_copy;
} else {
AZLogWarn("[{}] {}Membuf [{}, {}) (bc [{}, {})) not marked "
"uptodate by other thread, returning EAGAIN",
ino, inject_eagain ? "PP: " : "",
mb->offset.load(), mb->offset.load()+mb->length.load(),
bc.offset, bc.offset+bc.length);
assert(err == 0);
err = EAGAIN;
/*
* Release the membuf before returning, so that when the caller
* calls us again we get a new "full" membuf not this partial
* membuf again, else we will be stuck in a loop.
* We need to drop the inuse count for release() to work, then
* re-acquire it for subsequent code to work.
*/
mb->clear_inuse();
filecache_handle->release(mb->offset, mb->length);
mb->set_inuse();
}
}
/*
* Done with the copy, release the membuf lock and clear inuse.
* The membuf is marked dirty so it's safe against cache prune/release.
* When we decide to flush this dirty membuf that time it'll be duly
* locked.
*/
mb->clear_locked();
mb->clear_inuse();
buf += bc.length;
assert(remaining >= bc.length);
remaining -= bc.length;
}
assert(remaining == 0);
return err;
}