in src/fuse/IoRing.cc [67:284]
CoTask<void> IoRing::process(
int spt,
int toProc,
storage::client::StorageClient &storageClient,
const storage::client::IoOptions &storageIo,
UserConfig &userConfig,
std::function<void(std::vector<std::shared_ptr<RcInode>> &, const IoArgs *, const IoSqe *, int)> &&lookupFiles,
std::function<void(std::vector<Result<lib::ShmBufForIO>> &, const IoArgs *, const IoSqe *, int)> &&lookupBufs) {
static monitor::LatencyRecorder overallLatency("usrbio.piov.overall", monitor::TagSet{{"mount_name", mountName}});
static monitor::LatencyRecorder prepareLatency("usrbio.piov.prepare", monitor::TagSet{{"mount_name", mountName}});
static monitor::LatencyRecorder submitLatency("usrbio.piov.submit", monitor::TagSet{{"mount_name", mountName}});
static monitor::LatencyRecorder completeLatency("usrbio.piov.complete", monitor::TagSet{{"mount_name", mountName}});
static monitor::DistributionRecorder ioSizeDist("usrbio.piov.io_size", monitor::TagSet{{"mount_name", mountName}});
static monitor::DistributionRecorder ioDepthDist("usrbio.piov.io_depth", monitor::TagSet{{"mount_name", mountName}});
static monitor::DistributionRecorder totalBytesDist("usrbio.piov.total_bytes",
monitor::TagSet{{"mount_name", mountName}});
static monitor::DistributionRecorder distinctFilesDist("usrbio.piov.distinct_files",
monitor::TagSet{{"mount_name", mountName}});
static monitor::DistributionRecorder distinctBufsDist("usrbio.piov.distinct_bufs",
monitor::TagSet{{"mount_name", mountName}});
static monitor::CountRecorder bwCount("usrbio.piov.bw", monitor::TagSet{{"mount_name", mountName}});
auto start = SteadyClock::now(), overallStart = start;
std::string ioType = forRead_ ? "read" : "write";
auto uids = std::to_string(userInfo_.uid.toUnderType());
auto &config = userConfig.getConfig(userInfo_);
std::vector<ssize_t> res;
if (!forRead_ && config.readonly()) {
res = std::vector<ssize_t>(toProc, static_cast<ssize_t>(-StatusCode::kReadOnlyMode));
} else {
res = std::vector<ssize_t>(toProc, 0);
size_t iod = 0, totalBytes = 0;
std::set<uint64_t> distinctFiles;
std::set<Uuid> distinctBufs;
std::vector<std::shared_ptr<RcInode>> inodes;
inodes.reserve(toProc);
lookupFiles(inodes, ringSection, sqeSection + spt, std::min(toProc, entries - spt));
if ((int)inodes.size() < toProc) {
lookupFiles(inodes, ringSection, sqeSection, toProc - (int)inodes.size());
}
std::vector<Result<lib::ShmBufForIO>> bufs;
bufs.reserve(toProc);
lookupBufs(bufs, ringSection, sqeSection + spt, std::min(toProc, entries - spt));
if ((int)bufs.size() < toProc) {
lookupBufs(bufs, ringSection, sqeSection, toProc - (int)bufs.size());
}
lib::agent::PioV ioExec(storageClient, config.chunk_size_limit(), res);
std::vector<uint64_t> truncateVers;
if (!forRead_) {
truncateVers.resize(toProc, 0);
}
for (int i = 0; i < toProc; ++i) {
auto idx = (spt + i) % entries;
auto sqe = sqeSection[idx];
const auto &args = ringSection[sqe.index];
++iod;
totalBytes += args.ioLen;
distinctFiles.insert(args.fileIid);
Uuid id;
memcpy(id.data, args.bufId, sizeof(id.data));
distinctBufs.insert(id);
ioSizeDist.addSample(args.ioLen, monitor::TagSet{{"io", ioType}, {"uid", uids}});
if (!inodes[i]) {
res[i] = -static_cast<ssize_t>(MetaCode::kNotFile);
continue;
}
if (!bufs[i]) {
res[i] = -static_cast<ssize_t>(bufs[i].error().code());
continue;
}
auto memh = co_await bufs[i]->memh(args.ioLen);
if (!memh) {
res[i] = -static_cast<ssize_t>(memh.error().code());
continue;
} else if (!bufs[i]->ptr() || !*memh) {
XLOGF(ERR, "{} is null when doing usrbio", *memh ? "buf ptr" : "memh");
res[i] = -static_cast<ssize_t>(ClientAgentCode::kIovShmFail);
continue;
}
if (!forRead_) {
auto beginWrite =
co_await inodes[i]->beginWrite(userInfo_, *getFuseClientsInstance().metaClient, args.fileOff, args.ioLen);
if (beginWrite.hasError()) {
res[i] = -static_cast<ssize_t>(beginWrite.error().code());
continue;
}
truncateVers[i] = *beginWrite;
}
auto addRes = forRead_
? ioExec.addRead(i, inodes[i]->inode, 0, args.fileOff, args.ioLen, bufs[i]->ptr(), **memh)
: ioExec.addWrite(i, inodes[i]->inode, 0, args.fileOff, args.ioLen, bufs[i]->ptr(), **memh);
if (!addRes) {
res[i] = -static_cast<ssize_t>(addRes.error().code());
}
}
auto now = SteadyClock::now();
prepareLatency.addSample(now - start, monitor::TagSet{{"io", ioType}, {"uid", uids}});
start = now;
ioDepthDist.addSample(iod, monitor::TagSet{{"io", ioType}, {"uid", uids}});
totalBytesDist.addSample(totalBytes, monitor::TagSet{{"io", ioType}, {"uid", uids}});
distinctFilesDist.addSample(distinctFiles.size(), monitor::TagSet{{"io", ioType}, {"uid", uids}});
distinctBufsDist.addSample(distinctBufs.size(), monitor::TagSet{{"io", ioType}, {"uid", uids}});
auto readOpt = storageIo.read();
if (flags_ & HF3FS_IOR_ALLOW_READ_UNCOMMITTED) {
readOpt.set_allowReadUncommitted(true);
}
auto execRes = co_await (forRead_ ? ioExec.executeRead(userInfo_, readOpt)
: ioExec.executeWrite(userInfo_, storageIo.write()));
now = SteadyClock::now();
submitLatency.addSample(now - start, monitor::TagSet{{"io", ioType}, {"uid", uids}});
start = now;
if (!execRes) {
for (auto &r : res) {
if (r >= 0) {
r = -static_cast<ssize_t>(execRes.error().code());
}
}
} else {
ioExec.finishIo(!(flags_ & HF3FS_IOR_FORBID_READ_HOLES));
}
if (!forRead_) {
for (int i = 0; i < toProc; ++i) {
auto &inode = inodes[i];
if (!inode) {
continue;
}
auto sqe = sqeSection[(spt + i) % entries];
auto off = ringSection[sqe.index].fileOff;
auto r = res[i];
inode->finishWrite(userInfo_.uid, truncateVers[i], off, r);
}
}
}
auto newSpt = (spt + toProc) % entries;
std::vector<IoSqe> sqes(toProc);
for (int i = 0; i < toProc; ++i) {
sqes[i] = sqeSection[(spt + i) % entries];
}
{
// lock for between threads (io workers)
// atomics for between processes (io worker & io generator)
std::lock_guard lock(cqeMtx_);
if (sqeProcTails_.empty()) {
XLOGF(FATAL, "bug?! sqeProcTails_ is empty");
}
if (sqeProcTails_.front() != newSpt) {
sqeDoneTails_.insert(newSpt);
} else {
sqeTail = newSpt;
sqeProcTails_.pop_front();
while (!sqeDoneTails_.empty()) {
if (sqeProcTails_.empty()) {
XLOGF(FATAL, "bug?! sqeProcTails_ is empty");
}
auto first = sqeProcTails_.front();
auto it = sqeDoneTails_.find(first);
if (it == sqeDoneTails_.end()) {
break;
} else {
sqeTail = first;
sqeProcTails_.pop_front();
sqeDoneTails_.erase(it);
}
}
}
for (int i = 0; i < toProc; ++i) {
auto &sqe = sqes[i];
auto r = res[i];
auto addRes = addCqe(sqe.index, r >= 0 ? r : -static_cast<ssize_t>(StatusCode::toErrno(-r)), sqe.userdata);
if (!addRes) {
XLOGF(FATAL, "failed to add cqe");
}
}
processing_ -= toProc;
}
sem_post(cqeSem.get());
size_t doneBytes = 0;
for (auto r : res) {
if (r > 0) {
doneBytes += r;
}
}
bwCount.addSample(doneBytes, monitor::TagSet{{"io", ioType}, {"uid", uids}});
auto now = SteadyClock::now();
completeLatency.addSample(now - start, monitor::TagSet{{"io", ioType}, {"uid", uids}});
overallLatency.addSample(now - overallStart, monitor::TagSet{{"io", ioType}, {"uid", uids}});
}