void nfs_client::jukebox_runner()

in turbonfs/src/nfs_client.cpp [412:680]


void nfs_client::jukebox_runner()
{
    AZLogDebug("Started jukebox_runner");

    do {
        int jukebox_requests;

        {
            std::unique_lock<std::mutex> lock(jukebox_seeds_lock_39);
            jukebox_requests = jukebox_seeds.size();
        }

        /*
         * If no jukebox queued, wait more else wait less in order to meet the
         * 5 sec jukebox deadline.
         */
        if (jukebox_requests == 0) {
            ::sleep(5);
        } else {
            ::sleep(1);
        }

        {
            std::unique_lock<std::mutex> lock(jukebox_seeds_lock_39);
            jukebox_requests = jukebox_seeds.size();
            if (jukebox_requests == 0) {
                continue;
            }
        }

        AZLogDebug("jukebox_runner woken up ({} requests in queue)",
                   jukebox_requests);

        /*
         * Go over all queued requests and issue those which are ready to be
         * issued, i.e., they have been queued for more than JUKEBOX_DELAY_SECS
         * seconds. We issue the requests after releasing jukebox_seeds_lock_39.
         */
        std::vector<jukebox_seedinfo *> jsv;
        {
            std::unique_lock<std::mutex> lock(jukebox_seeds_lock_39);
            while (!jukebox_seeds.empty()) {
                struct jukebox_seedinfo *js = jukebox_seeds.front();

                if (js->run_at_msecs > get_current_msecs()) {
                    break;
                }

                jukebox_seeds.pop();

                jsv.push_back(js);
            }
        }

        for (struct jukebox_seedinfo *js : jsv) {
            switch (js->rpc_api->optype) {
                case FUSE_LOOKUP:
                    AZLogWarn("[JUKEBOX REISSUE] LOOKUP(req={}, "
                              "parent_ino={}, name={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->lookup_task.get_parent_ino(),
                              js->rpc_api->lookup_task.get_file_name());
                    lookup(js->rpc_api->req,
                           js->rpc_api->lookup_task.get_parent_ino(),
                           js->rpc_api->lookup_task.get_file_name());
                    break;
                case FUSE_ACCESS:
                    AZLogWarn("[JUKEBOX REISSUE] ACCESS(req={}, "
                              "ino={}, mask=0{:03o})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->access_task.get_ino(),
                              js->rpc_api->access_task.get_mask());
                    access(js->rpc_api->req,
                           js->rpc_api->access_task.get_ino(),
                           js->rpc_api->access_task.get_mask());
                    break;
                case FUSE_GETATTR:
                    AZLogWarn("[JUKEBOX REISSUE] GETATTR(req={}, ino={}, "
                              "fi=null)",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->getattr_task.get_ino());
                    getattr(js->rpc_api->req,
                            js->rpc_api->getattr_task.get_ino(),
                            nullptr);
                    break;
                case FUSE_SETATTR:
                    AZLogWarn("[JUKEBOX REISSUE] SETATTR(req={}, ino={}, "
                              "to_set=0x{:x}, fi={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->setattr_task.get_ino(),
                              js->rpc_api->setattr_task.get_attr_flags_to_set(),
                              fmt::ptr(js->rpc_api->setattr_task.get_fuse_file()));
                    setattr(js->rpc_api->req,
                            js->rpc_api->setattr_task.get_ino(),
                            js->rpc_api->setattr_task.get_attr(),
                            js->rpc_api->setattr_task.get_attr_flags_to_set(),
                            js->rpc_api->setattr_task.get_fuse_file());
                    break;
                case FUSE_STATFS:
                    AZLogWarn("[JUKEBOX REISSUE] STATFS(req={}, ino={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->statfs_task.get_ino());
                    statfs(js->rpc_api->req,
                           js->rpc_api->statfs_task.get_ino());
                    break;
                case FUSE_CREATE:
                    AZLogWarn("[JUKEBOX REISSUE] CREATE(req={}, parent_ino={},"
                              " name={}, mode=0{:03o}, fi={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->create_task.get_parent_ino(),
                              js->rpc_api->create_task.get_file_name(),
                              js->rpc_api->create_task.get_mode(),
                              fmt::ptr(js->rpc_api->create_task.get_fuse_file()));
                    create(js->rpc_api->req,
                           js->rpc_api->create_task.get_parent_ino(),
                           js->rpc_api->create_task.get_file_name(),
                           js->rpc_api->create_task.get_mode(),
                           js->rpc_api->create_task.get_fuse_file());
                    break;
                case FUSE_MKNOD:
                    AZLogWarn("[JUKEBOX REISSUE] MKNOD(req={}, parent_ino={},"
                              " name={}, mode=0{:03o})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->mknod_task.get_parent_ino(),
                              js->rpc_api->mknod_task.get_file_name(),
                              js->rpc_api->mknod_task.get_mode());
                    mknod(js->rpc_api->req,
                           js->rpc_api->mknod_task.get_parent_ino(),
                           js->rpc_api->mknod_task.get_file_name(),
                           js->rpc_api->mknod_task.get_mode());
                    break;
                case FUSE_MKDIR:
                    AZLogWarn("[JUKEBOX REISSUE] MKDIR(req={}, parent_ino={}, "
                              "name={}, mode=0{:03o})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->mkdir_task.get_parent_ino(),
                              js->rpc_api->mkdir_task.get_dir_name(),
                              js->rpc_api->mkdir_task.get_mode());
                    mkdir(js->rpc_api->req,
                          js->rpc_api->mkdir_task.get_parent_ino(),
                          js->rpc_api->mkdir_task.get_dir_name(),
                          js->rpc_api->mkdir_task.get_mode());
                    break;
                case FUSE_RMDIR:
                    AZLogWarn("[JUKEBOX REISSUE] RMDIR(req={}, parent_ino={}, "
                              "name={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->rmdir_task.get_parent_ino(),
                              js->rpc_api->rmdir_task.get_dir_name());
                    rmdir(js->rpc_api->req,
                          js->rpc_api->rmdir_task.get_parent_ino(),
                          js->rpc_api->rmdir_task.get_dir_name());
                    break;
                case FUSE_UNLINK:
                    AZLogWarn("[JUKEBOX REISSUE] UNLINK(req={}, parent_ino={}, "
                              "name={}, for_silly_rename={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->unlink_task.get_parent_ino(),
                              js->rpc_api->unlink_task.get_file_name(),
                              js->rpc_api->unlink_task.get_for_silly_rename());
                    unlink(js->rpc_api->req,
                           js->rpc_api->unlink_task.get_parent_ino(),
                           js->rpc_api->unlink_task.get_file_name(),
                           js->rpc_api->unlink_task.get_for_silly_rename());
                    break;
                case FUSE_SYMLINK:
                    AZLogWarn("[JUKEBOX REISSUE] SYMLINK(req={}, link={}, "
                              "parent_ino={}, name={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->symlink_task.get_link(),
                              js->rpc_api->symlink_task.get_parent_ino(),
                              js->rpc_api->symlink_task.get_name());
                    symlink(js->rpc_api->req,
                            js->rpc_api->symlink_task.get_link(),
                            js->rpc_api->symlink_task.get_parent_ino(),
                            js->rpc_api->symlink_task.get_name());
                    break;
                case FUSE_READLINK:
                    AZLogWarn("[JUKEBOX REISSUE] READLINK(req={}, ino={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->readlink_task.get_ino());
                    readlink(js->rpc_api->req,
                             js->rpc_api->readlink_task.get_ino());
                    break;
                case FUSE_RENAME:
                    AZLogWarn("[JUKEBOX REISSUE] RENAME(req={}, parent_ino={}, "
                              "name={}, newparent_ino={}, newname={}, "
                              "silly_rename={}, silly_rename_ino={}, "
                              "oldparent_ino={}, oldname={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->rename_task.get_parent_ino(),
                              js->rpc_api->rename_task.get_name(),
                              js->rpc_api->rename_task.get_newparent_ino(),
                              js->rpc_api->rename_task.get_newname(),
                              js->rpc_api->rename_task.get_silly_rename(),
                              js->rpc_api->rename_task.get_silly_rename_ino(),
                              js->rpc_api->rename_task.get_oldparent_ino(),
                              js->rpc_api->rename_task.get_oldname());
                    rename(js->rpc_api->req,
                           js->rpc_api->rename_task.get_parent_ino(),
                           js->rpc_api->rename_task.get_name(),
                           js->rpc_api->rename_task.get_newparent_ino(),
                           js->rpc_api->rename_task.get_newname(),
                           js->rpc_api->rename_task.get_silly_rename(),
                           js->rpc_api->rename_task.get_silly_rename_ino(),
                           js->rpc_api->rename_task.get_oldparent_ino(),
                           js->rpc_api->rename_task.get_oldname());
                    break;
                case FUSE_READ:
                    AZLogWarn("[JUKEBOX REISSUE] READ(req={}, ino={}, "
                              "size={}, offset={} fi={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->read_task.get_ino(),
                              js->rpc_api->read_task.get_size(),
                              js->rpc_api->read_task.get_offset(),
                              fmt::ptr(js->rpc_api->read_task.get_fuse_file()));
                    jukebox_read(js->rpc_api);
                    break;
                case FUSE_READDIR:
                    AZLogWarn("[JUKEBOX REISSUE] READDIR(req={}, ino={}, "
                              "size={}, off={}, fi={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->readdir_task.get_ino(),
                              js->rpc_api->readdir_task.get_size(),
                              js->rpc_api->readdir_task.get_offset(),
                              fmt::ptr(js->rpc_api->readdir_task.get_fuse_file()));
                    readdir(js->rpc_api->req,
                            js->rpc_api->readdir_task.get_ino(),
                            js->rpc_api->readdir_task.get_size(),
                            js->rpc_api->readdir_task.get_offset(),
                            js->rpc_api->readdir_task.get_fuse_file());
                    break;
                case FUSE_READDIRPLUS:
                    AZLogWarn("[JUKEBOX REISSUE] READDIRPLUS(req={}, ino={}, "
                              "size={}, off={}, fi={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->readdir_task.get_ino(),
                              js->rpc_api->readdir_task.get_size(),
                              js->rpc_api->readdir_task.get_offset(),
                              fmt::ptr(js->rpc_api->readdir_task.get_fuse_file()));
                    readdirplus(js->rpc_api->req,
                                js->rpc_api->readdir_task.get_ino(),
                                js->rpc_api->readdir_task.get_size(),
                                js->rpc_api->readdir_task.get_offset(),
                                js->rpc_api->readdir_task.get_fuse_file());
                    break;
                case FUSE_WRITE:
                    AZLogWarn("[JUKEBOX REISSUE] WRITE(req={}, ino={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->write_task.get_ino());
                    jukebox_write(js->rpc_api);
                    break;
                case FUSE_FLUSH:
                    AZLogWarn("[JUKEBOX REISSUE] COMMIT(req={}, ino={})",
                              fmt::ptr(js->rpc_api->req),
                              js->rpc_api->flush_task.get_ino());
                    jukebox_flush(js->rpc_api);
                    break;
                /* TODO: Add other request types */
                default:
                    AZLogError("Unknown jukebox seed type: {}", (int) js->rpc_api->optype);
                    assert(0);
                    break;
            }

            delete js;
        }
    } while (!shutting_down);
}