in eden/fs/fuse/FuseChannel.cpp [1554:1837]
void FuseChannel::processSession() {
std::vector<char> buf(bufferSize_);
// Save this for the sanity check later in the loop to avoid
// additional syscalls on each loop iteration.
auto myPid = getpid();
while (!stop_.load(std::memory_order_relaxed)) {
// TODO: FUSE_SPLICE_READ allows using splice(2) here if we enable it.
// We can look at turning this on once the main plumbing is complete.
auto res = read(fuseDevice_.fd(), buf.data(), buf.size());
if (UNLIKELY(res < 0)) {
int error = errno;
if (stop_.load(std::memory_order_relaxed)) {
break;
}
if (error == EINTR || error == EAGAIN) {
// If we got interrupted by a signal while reading the next
// fuse command, we will simply retry and read the next thing.
continue;
} else if (error == ENOENT) {
// According to comments in the libfuse code:
// ENOENT means the operation was interrupted; it's safe to restart
continue;
} else if (error == ENODEV) {
// ENODEV means the filesystem was unmounted
folly::call_once(unmountLogFlag_, [this] {
XLOG(DBG3) << "received unmount event ENODEV on mount " << mountPath_;
});
requestSessionExit(StopReason::UNMOUNTED);
break;
} else {
XLOG(WARNING) << "error reading from fuse channel: "
<< folly::errnoStr(error);
requestSessionExit(StopReason::FUSE_READ_ERROR);
break;
}
}
const auto arg_size = static_cast<size_t>(res);
if (arg_size < sizeof(struct fuse_in_header)) {
if (arg_size == 0) {
// This code path is hit when a fake FUSE channel is closed in our unit
// tests. On real FUSE channels we should get ENODEV to indicate that
// the FUSE channel was shut down. However, in our unit tests that use
// fake FUSE connections we cannot send an ENODEV error, and so we just
// close the channel instead.
requestSessionExit(StopReason::UNMOUNTED);
} else {
// We got a partial FUSE header. This shouldn't ever happen unless
// there is a bug in the FUSE kernel code.
XLOG(ERR) << "read truncated message from kernel fuse device: len="
<< arg_size;
requestSessionExit(StopReason::FUSE_TRUNCATED_REQUEST);
}
return;
}
const auto* header = reinterpret_cast<fuse_in_header*>(buf.data());
const ByteRange arg{
reinterpret_cast<const uint8_t*>(header + 1),
arg_size - sizeof(fuse_in_header)};
XLOG(DBG7) << "fuse request opcode=" << header->opcode << " "
<< fuseOpcodeName(header->opcode) << " unique=" << header->unique
<< " len=" << header->len << " nodeid=" << header->nodeid
<< " uid=" << header->uid << " gid=" << header->gid
<< " pid=" << header->pid;
// On Linux, if security caps are enabled and the FUSE filesystem implements
// xattr support, every FUSE_WRITE opcode is preceded by FUSE_GETXATTR for
// "security.capability". Until we discover a way to tell the kernel that
// they will always return nothing in an Eden mount, short-circuit that path
// as efficiently and as early as possible.
//
// On some systems, the kernel also frequently requests
// POSIX ACL xattrs, so fast track those too, if only to make strace
// logs easier to follow.
if (header->opcode == FUSE_GETXATTR) {
const auto getxattr =
reinterpret_cast<const fuse_getxattr_in*>(arg.data());
// Evaluate strlen before the comparison loop below.
const StringPiece namePiece{reinterpret_cast<const char*>(getxattr + 1)};
static constexpr StringPiece kFastTracks[] = {
"security.capability",
"system.posix_acl_access",
"system.posix_acl_default"};
// Unclear whether one strlen and matching compares is better than
// strcmps, but it's probably in the noise.
bool matched = false;
for (auto fastTrack : kFastTracks) {
if (namePiece == fastTrack) {
replyError(*header, ENODATA);
matched = true;
break;
}
}
if (matched) {
continue;
}
}
// Sanity check to ensure that the request wasn't from ourself.
//
// We should never make requests to ourself via normal filesytem
// operations going through the kernel. Otherwise we risk deadlocks if the
// kernel calls us while holding an inode lock, and we then end up making a
// filesystem call that need the same inode lock. We will then not be able
// to resolve this deadlock on kernel inode locks without rebooting the
// system.
if (UNLIKELY(static_cast<pid_t>(header->pid) == myPid)) {
replyError(*header, EIO);
XLOG(CRITICAL) << "Received FUSE request from our own pid: opcode="
<< header->opcode << " nodeid=" << header->nodeid
<< " pid=" << header->pid;
continue;
}
auto* handlerEntry = lookupFuseHandlerEntry(header->opcode);
processAccessLog_.recordAccess(
header->pid,
handlerEntry ? handlerEntry->accessType : AccessType::FsChannelOther);
switch (header->opcode) {
case FUSE_INIT:
replyError(*header, EPROTO);
throw std::runtime_error(
"received FUSE_INIT after we have been initialized!?");
case FUSE_GETLK:
case FUSE_SETLK:
case FUSE_SETLKW:
// Deliberately not handling locking; this causes
// the kernel to do it for us
XLOG(DBG7) << fuseOpcodeName(header->opcode);
replyError(*header, ENOSYS);
break;
#ifdef __linux__
case FUSE_LSEEK:
// We only support stateless file handles, so lseek() is meaningless
// for us. Returning ENOSYS causes the kernel to implement it for us,
// and will cause it to stop sending subsequent FUSE_LSEEK requests.
XLOG(DBG7) << "FUSE_LSEEK";
replyError(*header, ENOSYS);
break;
#endif
case FUSE_POLL:
// We do not currently implement FUSE_POLL.
XLOG(DBG7) << "FUSE_POLL";
replyError(*header, ENOSYS);
break;
case FUSE_INTERRUPT: {
// no reply is required
XLOG(DBG7) << "FUSE_INTERRUPT";
// Ignore it: we don't have a reliable way to guarantee
// that interrupting functions correctly.
// In addition, the kernel (certainly on macOS) may recycle
// ids too quickly for us to safely track by `unique` id.
break;
}
case FUSE_DESTROY:
XLOG(DBG7) << "FUSE_DESTROY";
dispatcher_->destroy();
// FUSE on linux doesn't care whether we reply to FUSE_DESTROY
// but the macOS implementation blocks the unmount syscall until
// we have responded, which in turn blocks our attempt to gracefully
// unmount, so we respond here. It doesn't hurt Linux to respond
// so we do it for both platforms.
replyError(*header, 0);
break;
case FUSE_NOTIFY_REPLY:
XLOG(DBG7) << "FUSE_NOTIFY_REPLY";
// Don't strictly need to do anything here, but may want to
// turn the kernel notifications in Futures and use this as
// a way to fulfil the promise
break;
case FUSE_IOCTL:
// Rather than the default ENOSYS, we need to return ENOTTY
// to indicate that the requested ioctl is not supported
replyError(*header, ENOTTY);
break;
default: {
if (handlerEntry && handlerEntry->handler) {
auto requestId = generateUniqueID();
if (handlerEntry->argRenderer &&
traceDetailedArguments_->load(std::memory_order_acquire)) {
traceBus_->publish(FuseTraceEvent::start(
requestId, *header, handlerEntry->argRenderer(arg)));
} else {
traceBus_->publish(FuseTraceEvent::start(requestId, *header));
}
// This is a shared_ptr because, due to timeouts, the internal request
// lifetime may not match the FUSE request lifetime, so we capture it
// in both. I'm sure this could be improved with some cleverness.
auto request =
RequestContext::makeSharedRequestContext<FuseRequestContext>(
this, *header);
++state_.wlock()->pendingRequests;
auto headerCopy = *header;
FB_LOG(*straceLogger_, DBG7, ([&]() -> std::string {
std::string rendered;
if (handlerEntry->argRenderer) {
rendered = handlerEntry->argRenderer(arg);
}
return fmt::format(
"{}({}{}{})",
handlerEntry->getShortName(),
headerCopy.nodeid,
rendered.empty() ? "" : ", ",
rendered);
})());
request
->catchErrors(
folly::makeFutureWith([&] {
request->startRequest(
dispatcher_->getStats(),
handlerEntry->stat,
*(liveRequestWatches_.get()));
return (this->*handlerEntry->handler)(
*request, request->getReq(), arg)
.semi()
.via(&folly::QueuedImmediateExecutor::instance());
}).ensure([request] {
}).within(requestTimeout_),
notifier_.get())
.ensure([this, request, requestId, headerCopy] {
traceBus_->publish(FuseTraceEvent::finish(
requestId, headerCopy, request->getResult()));
// We may be complete; check to see if all requests are
// done and whether there are any threads remaining.
auto state = state_.wlock();
XCHECK_NE(state->pendingRequests, 0u)
<< "pendingRequests double decrement";
if (--state->pendingRequests == 0 &&
state->stoppedThreads == numThreads_) {
sessionComplete(std::move(state));
}
});
break;
}
const auto opcode = header->opcode;
tryRlockCheckBeforeUpdate<folly::Unit>(
unhandledOpcodes_,
[&](const auto& unhandledOpcodes) -> std::optional<folly::Unit> {
if (unhandledOpcodes.find(opcode) != unhandledOpcodes.end()) {
return folly::unit;
}
return std::nullopt;
},
[&](auto& unhandledOpcodes) -> folly::Unit {
XLOG(WARN) << "unhandled fuse opcode " << opcode << "("
<< fuseOpcodeName(opcode) << ")";
unhandledOpcodes->insert(opcode);
return folly::unit;
});
try {
replyError(*header, ENOSYS);
} catch (const std::system_error& exc) {
XLOG(ERR) << "Failed to write error response to fuse: " << exc.what();
requestSessionExit(StopReason::FUSE_WRITE_ERROR);
return;
}
break;
}
}
}
}