void FuseChannel::processSession()

in eden/fs/fuse/FuseChannel.cpp [1554:1837]


void FuseChannel::processSession() {
  std::vector<char> buf(bufferSize_);
  // Save this for the sanity check later in the loop to avoid
  // additional syscalls on each loop iteration.
  auto myPid = getpid();

  while (!stop_.load(std::memory_order_relaxed)) {
    // TODO: FUSE_SPLICE_READ allows using splice(2) here if we enable it.
    // We can look at turning this on once the main plumbing is complete.
    auto res = read(fuseDevice_.fd(), buf.data(), buf.size());
    if (UNLIKELY(res < 0)) {
      int error = errno;
      if (stop_.load(std::memory_order_relaxed)) {
        break;
      }

      if (error == EINTR || error == EAGAIN) {
        // If we got interrupted by a signal while reading the next
        // fuse command, we will simply retry and read the next thing.
        continue;
      } else if (error == ENOENT) {
        // According to comments in the libfuse code:
        // ENOENT means the operation was interrupted; it's safe to restart
        continue;
      } else if (error == ENODEV) {
        // ENODEV means the filesystem was unmounted
        folly::call_once(unmountLogFlag_, [this] {
          XLOG(DBG3) << "received unmount event ENODEV on mount " << mountPath_;
        });
        requestSessionExit(StopReason::UNMOUNTED);
        break;
      } else {
        XLOG(WARNING) << "error reading from fuse channel: "
                      << folly::errnoStr(error);
        requestSessionExit(StopReason::FUSE_READ_ERROR);
        break;
      }
    }

    const auto arg_size = static_cast<size_t>(res);
    if (arg_size < sizeof(struct fuse_in_header)) {
      if (arg_size == 0) {
        // This code path is hit when a fake FUSE channel is closed in our unit
        // tests.  On real FUSE channels we should get ENODEV to indicate that
        // the FUSE channel was shut down.  However, in our unit tests that use
        // fake FUSE connections we cannot send an ENODEV error, and so we just
        // close the channel instead.
        requestSessionExit(StopReason::UNMOUNTED);
      } else {
        // We got a partial FUSE header.  This shouldn't ever happen unless
        // there is a bug in the FUSE kernel code.
        XLOG(ERR) << "read truncated message from kernel fuse device: len="
                  << arg_size;
        requestSessionExit(StopReason::FUSE_TRUNCATED_REQUEST);
      }
      return;
    }

    const auto* header = reinterpret_cast<fuse_in_header*>(buf.data());
    const ByteRange arg{
        reinterpret_cast<const uint8_t*>(header + 1),
        arg_size - sizeof(fuse_in_header)};

    XLOG(DBG7) << "fuse request opcode=" << header->opcode << " "
               << fuseOpcodeName(header->opcode) << " unique=" << header->unique
               << " len=" << header->len << " nodeid=" << header->nodeid
               << " uid=" << header->uid << " gid=" << header->gid
               << " pid=" << header->pid;

    // On Linux, if security caps are enabled and the FUSE filesystem implements
    // xattr support, every FUSE_WRITE opcode is preceded by FUSE_GETXATTR for
    // "security.capability". Until we discover a way to tell the kernel that
    // they will always return nothing in an Eden mount, short-circuit that path
    // as efficiently and as early as possible.
    //
    // On some systems, the kernel also frequently requests
    // POSIX ACL xattrs, so fast track those too, if only to make strace
    // logs easier to follow.
    if (header->opcode == FUSE_GETXATTR) {
      const auto getxattr =
          reinterpret_cast<const fuse_getxattr_in*>(arg.data());

      // Evaluate strlen before the comparison loop below.
      const StringPiece namePiece{reinterpret_cast<const char*>(getxattr + 1)};
      static constexpr StringPiece kFastTracks[] = {
          "security.capability",
          "system.posix_acl_access",
          "system.posix_acl_default"};

      // Unclear whether one strlen and matching compares is better than
      // strcmps, but it's probably in the noise.
      bool matched = false;
      for (auto fastTrack : kFastTracks) {
        if (namePiece == fastTrack) {
          replyError(*header, ENODATA);
          matched = true;
          break;
        }
      }
      if (matched) {
        continue;
      }
    }

    // Sanity check to ensure that the request wasn't from ourself.
    //
    // We should never make requests to ourself via normal filesytem
    // operations going through the kernel.  Otherwise we risk deadlocks if the
    // kernel calls us while holding an inode lock, and we then end up making a
    // filesystem call that need the same inode lock.  We will then not be able
    // to resolve this deadlock on kernel inode locks without rebooting the
    // system.
    if (UNLIKELY(static_cast<pid_t>(header->pid) == myPid)) {
      replyError(*header, EIO);
      XLOG(CRITICAL) << "Received FUSE request from our own pid: opcode="
                     << header->opcode << " nodeid=" << header->nodeid
                     << " pid=" << header->pid;
      continue;
    }

    auto* handlerEntry = lookupFuseHandlerEntry(header->opcode);
    processAccessLog_.recordAccess(
        header->pid,
        handlerEntry ? handlerEntry->accessType : AccessType::FsChannelOther);

    switch (header->opcode) {
      case FUSE_INIT:
        replyError(*header, EPROTO);
        throw std::runtime_error(
            "received FUSE_INIT after we have been initialized!?");

      case FUSE_GETLK:
      case FUSE_SETLK:
      case FUSE_SETLKW:
        // Deliberately not handling locking; this causes
        // the kernel to do it for us
        XLOG(DBG7) << fuseOpcodeName(header->opcode);
        replyError(*header, ENOSYS);
        break;

#ifdef __linux__
      case FUSE_LSEEK:
        // We only support stateless file handles, so lseek() is meaningless
        // for us.  Returning ENOSYS causes the kernel to implement it for us,
        // and will cause it to stop sending subsequent FUSE_LSEEK requests.
        XLOG(DBG7) << "FUSE_LSEEK";
        replyError(*header, ENOSYS);
        break;
#endif

      case FUSE_POLL:
        // We do not currently implement FUSE_POLL.
        XLOG(DBG7) << "FUSE_POLL";
        replyError(*header, ENOSYS);
        break;

      case FUSE_INTERRUPT: {
        // no reply is required
        XLOG(DBG7) << "FUSE_INTERRUPT";
        // Ignore it: we don't have a reliable way to guarantee
        // that interrupting functions correctly.
        // In addition, the kernel (certainly on macOS) may recycle
        // ids too quickly for us to safely track by `unique` id.
        break;
      }

      case FUSE_DESTROY:
        XLOG(DBG7) << "FUSE_DESTROY";
        dispatcher_->destroy();
        // FUSE on linux doesn't care whether we reply to FUSE_DESTROY
        // but the macOS implementation blocks the unmount syscall until
        // we have responded, which in turn blocks our attempt to gracefully
        // unmount, so we respond here.  It doesn't hurt Linux to respond
        // so we do it for both platforms.
        replyError(*header, 0);
        break;

      case FUSE_NOTIFY_REPLY:
        XLOG(DBG7) << "FUSE_NOTIFY_REPLY";
        // Don't strictly need to do anything here, but may want to
        // turn the kernel notifications in Futures and use this as
        // a way to fulfil the promise
        break;

      case FUSE_IOCTL:
        // Rather than the default ENOSYS, we need to return ENOTTY
        // to indicate that the requested ioctl is not supported
        replyError(*header, ENOTTY);
        break;

      default: {
        if (handlerEntry && handlerEntry->handler) {
          auto requestId = generateUniqueID();
          if (handlerEntry->argRenderer &&
              traceDetailedArguments_->load(std::memory_order_acquire)) {
            traceBus_->publish(FuseTraceEvent::start(
                requestId, *header, handlerEntry->argRenderer(arg)));
          } else {
            traceBus_->publish(FuseTraceEvent::start(requestId, *header));
          }

          // This is a shared_ptr because, due to timeouts, the internal request
          // lifetime may not match the FUSE request lifetime, so we capture it
          // in both. I'm sure this could be improved with some cleverness.
          auto request =
              RequestContext::makeSharedRequestContext<FuseRequestContext>(
                  this, *header);

          ++state_.wlock()->pendingRequests;

          auto headerCopy = *header;

          FB_LOG(*straceLogger_, DBG7, ([&]() -> std::string {
            std::string rendered;
            if (handlerEntry->argRenderer) {
              rendered = handlerEntry->argRenderer(arg);
            }
            return fmt::format(
                "{}({}{}{})",
                handlerEntry->getShortName(),
                headerCopy.nodeid,
                rendered.empty() ? "" : ", ",
                rendered);
          })());

          request
              ->catchErrors(
                  folly::makeFutureWith([&] {
                    request->startRequest(
                        dispatcher_->getStats(),
                        handlerEntry->stat,
                        *(liveRequestWatches_.get()));
                    return (this->*handlerEntry->handler)(
                               *request, request->getReq(), arg)
                        .semi()
                        .via(&folly::QueuedImmediateExecutor::instance());
                  }).ensure([request] {
                    }).within(requestTimeout_),
                  notifier_.get())
              .ensure([this, request, requestId, headerCopy] {
                traceBus_->publish(FuseTraceEvent::finish(
                    requestId, headerCopy, request->getResult()));

                // We may be complete; check to see if all requests are
                // done and whether there are any threads remaining.
                auto state = state_.wlock();
                XCHECK_NE(state->pendingRequests, 0u)
                    << "pendingRequests double decrement";
                if (--state->pendingRequests == 0 &&
                    state->stoppedThreads == numThreads_) {
                  sessionComplete(std::move(state));
                }
              });
          break;
        }

        const auto opcode = header->opcode;
        tryRlockCheckBeforeUpdate<folly::Unit>(
            unhandledOpcodes_,
            [&](const auto& unhandledOpcodes) -> std::optional<folly::Unit> {
              if (unhandledOpcodes.find(opcode) != unhandledOpcodes.end()) {
                return folly::unit;
              }
              return std::nullopt;
            },
            [&](auto& unhandledOpcodes) -> folly::Unit {
              XLOG(WARN) << "unhandled fuse opcode " << opcode << "("
                         << fuseOpcodeName(opcode) << ")";
              unhandledOpcodes->insert(opcode);
              return folly::unit;
            });

        try {
          replyError(*header, ENOSYS);
        } catch (const std::system_error& exc) {
          XLOG(ERR) << "Failed to write error response to fuse: " << exc.what();
          requestSessionExit(StopReason::FUSE_WRITE_ERROR);
          return;
        }
        break;
      }
    }
  }
}