in src/fuse/FuseOps.cc [1552:1689]
void hf3fs_write(fuse_req_t req, fuse_ino_t fino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi) {
static auto mountName = d.fuseRemountPref.value_or(d.fuseMountpoint).native();
static monitor::LatencyRecorder writeLatency("fuse.write.latency", monitor::TagSet{{"mount_name", mountName}});
static monitor::DistributionRecorder writeDist("fuse.write.size", monitor::TagSet{{"mount_name", mountName}});
auto uid = fuse_req_ctx(req)->uid;
auto uids = std::to_string(uid);
auto start = SteadyClock::now();
auto userInfo = UserInfo(flat::Uid(fuse_req_ctx(req)->uid), flat::Gid(fuse_req_ctx(req)->gid), d.fuseToken);
if (d.userConfig.getConfig(userInfo).dryrun_bench_mode()) {
fuse_reply_write(req, size);
return;
}
if (d.userConfig.getConfig(userInfo).readonly()) {
fuse_reply_err(req, EROFS);
return;
}
writeDist.addSample(size, monitor::TagSet{{"uid", uids}});
auto ino = real_ino(fino);
XLOGF(OP_LOG_LEVEL,
"hf3fs_write(ino={}, buf={}, size={}, off={}, pid={})",
ino,
(uintptr_t)buf,
size,
off,
fuse_req_ctx(req)->pid);
record("write", fuse_req_ctx(req)->uid);
auto odir = ((FileHandle *)fi->fh)->oDirect;
auto pi = inodeOf(*fi, ino);
// auto &inode = pi->inode;
std::unique_lock lock(pi->wbMtx);
auto wb = pi->writeBuf;
if (odir || !d.config->io_bufs().write_buf_size()) {
if (!wb) {
lock.unlock();
} else {
if (wb->len) {
XLOGF(DBG, "to flush for o_direct fd prev off {} len {}", off, wb->off, wb->len);
if (flushBuf(req, pi, wb->off, *wb->memh, wb->len, true) < 0) {
wb->len = 0;
return;
}
wb->len = 0;
}
}
/*
if (!d.buf) {
XLOGF(INFO, " hf3fs_write register buffer");
d.buf.reset(std::make_unique<std::vector<uint8_t>>(d.maxBufsize));
auto ret = d.storageClient->registerIOBuffer(d.buf->data(), d.maxBufsize);
if (ret.hasError()) {
handle_error(req, ret);
return;
} else {
d.memh.reset(std::make_unique<IOBuffer>(std::move(*ret)));
}
}
*/
auto memh = IOBuffer(folly::coro::blocking_wait(d.bufPool->allocate()));
memcpy((char *)memh.data(), buf, size);
auto ret = flushBuf(req, pi, off, memh, size, false);
writeLatency.addSample(SteadyClock::now() - start, monitor::TagSet{{"uid", uids}});
if (ret >= 0) {
fuse_reply_write(req, ret);
}
return;
}
if (!wb) {
auto wb2 = std::make_shared<InodeWriteBuf>();
wb2->buf.resize(d.config->io_bufs().write_buf_size());
auto ret = d.storageClient->registerIOBuffer(wb2->buf.data(), wb2->buf.size());
if (!ret) {
handle_error(req, ret);
return;
}
wb2->memh.reset(new storage::client::IOBuffer(std::move(*ret)));
pi->writeBuf = wb2; //.compare_exchange_strong(wb, wb2);
if (!wb) {
wb = wb2;
}
}
// std::lock_guard lock(wb->mtx);
{
if (wb->len && wb->off + (ssize_t)wb->len != off) {
XLOGF(DBG, "to flush due to inconsecutive off {} prev off {} len {}", off, wb->off, wb->len);
if (flushBuf(req, pi, wb->off, *wb->memh, wb->len, true) < 0) {
wb->len = 0;
return;
}
wb->len = 0;
}
if (!wb->len) {
wb->off = off;
}
size_t done = 0;
do {
auto cplen = std::min(size - done, wb->buf.size() - wb->len);
memcpy(wb->buf.data() + wb->len, buf + done, cplen);
wb->len += cplen;
done += cplen;
if (wb->len == wb->buf.size()) {
XLOGF(DBG, "flush full buf {}", wb->len);
if (flushBuf(req, pi, wb->off, *wb->memh, wb->len, true) < 0) {
wb->len = 0;
return;
}
wb->len = 0;
wb->off = off + done;
}
} while (done < size);
if (wb->len) {
// notify background task to flush buf and sync
getFuseClientsInstance().dirtyInodes.lock()->insert(ino);
}
}
writeLatency.addSample(SteadyClock::now() - start, monitor::TagSet{{"uid", uids}});
fuse_reply_write(req, size);
}