void hf3fs_write()

in src/fuse/FuseOps.cc [1552:1689]


void hf3fs_write(fuse_req_t req, fuse_ino_t fino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi) {
  static auto mountName = d.fuseRemountPref.value_or(d.fuseMountpoint).native();
  static monitor::LatencyRecorder writeLatency("fuse.write.latency", monitor::TagSet{{"mount_name", mountName}});
  static monitor::DistributionRecorder writeDist("fuse.write.size", monitor::TagSet{{"mount_name", mountName}});

  auto uid = fuse_req_ctx(req)->uid;
  auto uids = std::to_string(uid);

  auto start = SteadyClock::now();

  auto userInfo = UserInfo(flat::Uid(fuse_req_ctx(req)->uid), flat::Gid(fuse_req_ctx(req)->gid), d.fuseToken);
  if (d.userConfig.getConfig(userInfo).dryrun_bench_mode()) {
    fuse_reply_write(req, size);
    return;
  }

  if (d.userConfig.getConfig(userInfo).readonly()) {
    fuse_reply_err(req, EROFS);
    return;
  }

  writeDist.addSample(size, monitor::TagSet{{"uid", uids}});

  auto ino = real_ino(fino);

  XLOGF(OP_LOG_LEVEL,
        "hf3fs_write(ino={}, buf={}, size={}, off={}, pid={})",
        ino,
        (uintptr_t)buf,
        size,
        off,
        fuse_req_ctx(req)->pid);
  record("write", fuse_req_ctx(req)->uid);

  auto odir = ((FileHandle *)fi->fh)->oDirect;
  auto pi = inodeOf(*fi, ino);
  //  auto &inode = pi->inode;

  std::unique_lock lock(pi->wbMtx);
  auto wb = pi->writeBuf;
  if (odir || !d.config->io_bufs().write_buf_size()) {
    if (!wb) {
      lock.unlock();
    } else {
      if (wb->len) {
        XLOGF(DBG, "to flush for o_direct fd  prev off {} len {}", off, wb->off, wb->len);
        if (flushBuf(req, pi, wb->off, *wb->memh, wb->len, true) < 0) {
          wb->len = 0;
          return;
        }

        wb->len = 0;
      }
    }
    /*
        if (!d.buf) {
          XLOGF(INFO, "  hf3fs_write register buffer");
          d.buf.reset(std::make_unique<std::vector<uint8_t>>(d.maxBufsize));
          auto ret = d.storageClient->registerIOBuffer(d.buf->data(), d.maxBufsize);
          if (ret.hasError()) {
            handle_error(req, ret);
            return;
          } else {
            d.memh.reset(std::make_unique<IOBuffer>(std::move(*ret)));
          }
        }
    */
    auto memh = IOBuffer(folly::coro::blocking_wait(d.bufPool->allocate()));

    memcpy((char *)memh.data(), buf, size);
    auto ret = flushBuf(req, pi, off, memh, size, false);

    writeLatency.addSample(SteadyClock::now() - start, monitor::TagSet{{"uid", uids}});

    if (ret >= 0) {
      fuse_reply_write(req, ret);
    }
    return;
  }

  if (!wb) {
    auto wb2 = std::make_shared<InodeWriteBuf>();
    wb2->buf.resize(d.config->io_bufs().write_buf_size());
    auto ret = d.storageClient->registerIOBuffer(wb2->buf.data(), wb2->buf.size());
    if (!ret) {
      handle_error(req, ret);
      return;
    }
    wb2->memh.reset(new storage::client::IOBuffer(std::move(*ret)));
    pi->writeBuf = wb2;  //.compare_exchange_strong(wb, wb2);
    if (!wb) {
      wb = wb2;
    }
  }

  // std::lock_guard lock(wb->mtx);
  {
    if (wb->len && wb->off + (ssize_t)wb->len != off) {
      XLOGF(DBG, "to flush due to inconsecutive off {} prev off {} len {}", off, wb->off, wb->len);
      if (flushBuf(req, pi, wb->off, *wb->memh, wb->len, true) < 0) {
        wb->len = 0;
        return;
      }

      wb->len = 0;
    }

    if (!wb->len) {
      wb->off = off;
    }

    size_t done = 0;
    do {
      auto cplen = std::min(size - done, wb->buf.size() - wb->len);
      memcpy(wb->buf.data() + wb->len, buf + done, cplen);
      wb->len += cplen;
      done += cplen;
      if (wb->len == wb->buf.size()) {
        XLOGF(DBG, "flush full buf {}", wb->len);
        if (flushBuf(req, pi, wb->off, *wb->memh, wb->len, true) < 0) {
          wb->len = 0;
          return;
        }

        wb->len = 0;
        wb->off = off + done;
      }
    } while (done < size);

    if (wb->len) {
      // notify background task to flush buf and sync
      getFuseClientsInstance().dirtyInodes.lock()->insert(ino);
    }
  }

  writeLatency.addSample(SteadyClock::now() - start, monitor::TagSet{{"uid", uids}});
  fuse_reply_write(req, size);
}