src/nfs/nfs_client_impl.cpp (469 lines of code) (raw):
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "nfs_client_impl.h"
#include <cstdint>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <mutex>
#include <string_view>
#include "fmt/core.h"
#include "nfs/nfs_code_definition.h"
#include "nfs/nfs_node.h"
#include "nlohmann/json.hpp"
#include "rpc/dns_resolver.h" // IWYU pragma: keep
#include "rpc/rpc_host_port.h"
#include "utils/blob.h"
#include "utils/command_manager.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/token_buckets.h"
DSN_DEFINE_uint32(nfs,
nfs_copy_block_bytes,
4 * 1024 * 1024,
"max block size (bytes) for each network copy");
static const char *kMaxCopyRateMegaBytesPerDiskDesc =
"The maximum bandwidth (MB/s) of writing data per local disk when copying from remote node, 0 "
"means no limit";
DSN_DEFINE_int64(nfs, max_copy_rate_megabytes_per_disk, 0, kMaxCopyRateMegaBytesPerDiskDesc);
DSN_TAG_VARIABLE(max_copy_rate_megabytes_per_disk, FT_MUTABLE);
bool check_max_copy_rate_megabytes_per_disk(int64_t value)
{
return value == 0 || (value << 20) > FLAGS_nfs_copy_block_bytes;
}
DSN_DEFINE_group_validator(max_copy_rate_megabytes_per_disk, [](std::string &message) -> bool {
return check_max_copy_rate_megabytes_per_disk(FLAGS_max_copy_rate_megabytes_per_disk);
});
DSN_DEFINE_int32(nfs,
max_concurrent_remote_copy_requests,
50,
"max concurrent remote copy to the same server on nfs client");
DSN_DEFINE_int32(nfs, max_concurrent_local_writes, 50, "max local file writes on nfs client");
DSN_DEFINE_int32(nfs, max_buffered_local_writes, 500, "max buffered file writes on nfs client");
DSN_DEFINE_int32(nfs,
high_priority_speed_rate,
2,
"the copy speed rate of high priority comparing with low priority on nfs client");
DSN_DEFINE_int32(nfs,
file_close_expire_time_ms,
60 * 1000,
"max idle time for an opening file on nfs server");
DSN_DEFINE_int32(nfs,
file_close_timer_interval_ms_on_server,
30 * 1000,
"time interval for checking whether cached file handles need to be closed");
DSN_DEFINE_int32(nfs,
max_file_copy_request_count_per_file,
2,
"maximum concurrent remote copy requests for the same file on nfs client"
"to limit each file copy speed");
DSN_DEFINE_int32(nfs, max_retry_count_per_copy_request, 2, "maximum retry count when copy failed");
DSN_DEFINE_int32(nfs,
rpc_timeout_ms,
1e5, // 100s
"rpc timeout in milliseconds for nfs copy, "
"0 means use default timeout of rpc engine");
METRIC_DEFINE_counter(server,
nfs_client_copy_bytes,
dsn::metric_unit::kBytes,
"The accumulated data size in bytes requested by client during nfs copy");
METRIC_DEFINE_counter(server,
nfs_client_copy_failed_requests,
dsn::metric_unit::kRequests,
"The number of failed nfs copy requests (requested by client)");
METRIC_DEFINE_counter(
server,
nfs_client_write_bytes,
dsn::metric_unit::kBytes,
"The accumulated data size in bytes that are written to local file in client");
METRIC_DEFINE_counter(server,
nfs_client_failed_writes,
dsn::metric_unit::kWrites,
"The number of failed writes to local file in client");
namespace dsn {
namespace service {
static uint32_t current_max_copy_rate_megabytes = 0;
nfs_client_impl::nfs_client_impl()
: _concurrent_copy_request_count(0),
_concurrent_local_write_count(0),
_buffered_local_write_count(0),
_copy_requests_low(FLAGS_max_file_copy_request_count_per_file),
_high_priority_remaining_time(FLAGS_high_priority_speed_rate),
METRIC_VAR_INIT_server(nfs_client_copy_bytes),
METRIC_VAR_INIT_server(nfs_client_copy_failed_requests),
METRIC_VAR_INIT_server(nfs_client_write_bytes),
METRIC_VAR_INIT_server(nfs_client_failed_writes)
{
_copy_token_buckets = std::make_unique<utils::token_buckets>();
register_cli_commands();
}
nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
void nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rci,
aio_task *nfs_task)
{
user_request_ptr req(new user_request());
req->high_priority = rci->high_priority;
SET_IP_AND_HOST_PORT_BY_DNS(req->file_size_req, source, rci->source);
req->file_size_req.dst_dir = rci->dest_dir;
req->file_size_req.file_list = rci->files;
req->file_size_req.source_dir = rci->source_dir;
req->file_size_req.overwrite = rci->overwrite;
req->file_size_req.__set_source_disk_tag(rci->source_disk_tag);
req->file_size_req.__set_dest_disk_tag(rci->dest_disk_tag);
req->file_size_req.__set_pid(rci->pid);
req->nfs_task = nfs_task;
req->is_finished = false;
async_nfs_get_file_size(
req->file_size_req,
[=](error_code err, get_file_size_response &&resp) {
end_get_file_size(err, std::move(resp), req);
},
std::chrono::milliseconds(FLAGS_rpc_timeout_ms),
req->file_size_req.source);
}
void nfs_client_impl::end_get_file_size(::dsn::error_code err,
const ::dsn::service::get_file_size_response &resp,
const user_request_ptr &ureq)
{
if (err != ::dsn::ERR_OK) {
LOG_ERROR("[nfs_service] remote get file size failed, source = {}, dir = {}, err = {}",
FMT_HOST_PORT_AND_IP(ureq->file_size_req, source),
ureq->file_size_req.source_dir,
err);
ureq->nfs_task->enqueue(err, 0);
return;
}
err = dsn::error_code(resp.error);
if (err != ::dsn::ERR_OK) {
LOG_ERROR("[nfs_service] remote get file size failed, source = {}, dir = {}, err = {}",
FMT_HOST_PORT_AND_IP(ureq->file_size_req, source),
ureq->file_size_req.source_dir,
err);
ureq->nfs_task->enqueue(err, 0);
return;
}
std::deque<copy_request_ex_ptr> copy_requests;
ureq->file_contexts.resize(resp.size_list.size());
for (size_t i = 0; i < resp.size_list.size(); i++) // file list
{
file_context_ptr filec(new file_context(ureq, resp.file_list[i], resp.size_list[i]));
ureq->file_contexts[i] = filec;
// init copy requests
uint64_t size = resp.size_list[i];
uint64_t req_offset = 0;
uint32_t req_size = size > FLAGS_nfs_copy_block_bytes ? FLAGS_nfs_copy_block_bytes
: static_cast<uint32_t>(size);
filec->copy_requests.reserve(size / FLAGS_nfs_copy_block_bytes + 1);
int idx = 0;
for (;;) // send one file with multi-round rpc
{
copy_request_ex_ptr req(
new copy_request_ex(filec, idx++, FLAGS_max_retry_count_per_copy_request));
req->offset = req_offset;
req->size = req_size;
req->is_last = (size <= req_size);
filec->copy_requests.push_back(req);
copy_requests.push_back(req);
req_offset += req_size;
size -= req_size;
if (size <= 0) {
CHECK_EQ_MSG(
size, 0, "last request must read exactly the remaing size of the file");
break;
}
req_size = size > FLAGS_nfs_copy_block_bytes ? FLAGS_nfs_copy_block_bytes
: static_cast<uint32_t>(size);
}
}
if (!copy_requests.empty()) {
zauto_lock l(_copy_requests_lock);
if (ureq->high_priority)
_copy_requests_high.insert(
_copy_requests_high.end(), copy_requests.begin(), copy_requests.end());
else
_copy_requests_low.push(std::move(copy_requests));
}
tasking::enqueue(
LPC_NFS_COPY_FILE, nullptr, [this]() { continue_copy(); }, 0);
}
void nfs_client_impl::continue_copy()
{
if (_buffered_local_write_count >= FLAGS_max_buffered_local_writes) {
// exceed max_buffered_local_writes limit, pause.
// the copy task will be triggered by continue_copy() invoked in local_write_callback().
return;
}
if (++_concurrent_copy_request_count > FLAGS_max_concurrent_remote_copy_requests) {
// exceed max_concurrent_remote_copy_requests limit, pause.
// the copy task will be triggered by continue_copy() invoked in end_copy().
--_concurrent_copy_request_count;
return;
}
copy_request_ex_ptr req = nullptr;
while (true) {
{
zauto_lock l(_copy_requests_lock);
if (_high_priority_remaining_time > 0 && !_copy_requests_high.empty()) {
// pop from high queue
req = _copy_requests_high.front();
_copy_requests_high.pop_front();
--_high_priority_remaining_time;
} else {
// try to pop from low queue
req = _copy_requests_low.pop();
if (req) {
_high_priority_remaining_time = FLAGS_high_priority_speed_rate;
}
}
if (!req && !_copy_requests_high.empty()) {
// pop from low queue failed, then pop from high priority,
// but not change the _high_priority_remaining_time
req = _copy_requests_high.front();
_copy_requests_high.pop_front();
}
if (req) {
++req->file_ctx->user_req->concurrent_copy_count;
} else {
// no copy request
--_concurrent_copy_request_count;
break;
}
}
{
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
if (FLAGS_max_copy_rate_megabytes_per_disk > 0) {
_copy_token_buckets->get_token_bucket(ureq->file_size_req.dest_disk_tag)
->consumeWithBorrowAndWait(
req->size,
FLAGS_max_copy_rate_megabytes_per_disk << 20,
1.5 * (FLAGS_max_copy_rate_megabytes_per_disk << 20));
}
copy_request copy_req;
SET_OBJ_IP_AND_HOST_PORT(copy_req, source, ureq->file_size_req, source);
copy_req.file_name = req->file_ctx->file_name;
copy_req.offset = req->offset;
copy_req.size = req->size;
copy_req.dst_dir = ureq->file_size_req.dst_dir;
copy_req.source_dir = ureq->file_size_req.source_dir;
copy_req.overwrite = ureq->file_size_req.overwrite;
copy_req.is_last = req->is_last;
copy_req.__set_source_disk_tag(ureq->file_size_req.source_disk_tag);
copy_req.__set_pid(ureq->file_size_req.pid);
req->remote_copy_task = async_nfs_copy(
copy_req,
[=](error_code err, copy_response &&resp) {
end_copy(err, std::move(resp), req);
// reset task to release memory quickly.
// should do this after end_copy() done.
if (req->is_ready_for_write) {
::dsn::task_ptr tsk;
zauto_lock l(req->lock);
tsk = std::move(req->remote_copy_task);
}
},
std::chrono::milliseconds(FLAGS_rpc_timeout_ms),
req->file_ctx->user_req->file_size_req.source);
} else {
--ureq->concurrent_copy_count;
--_concurrent_copy_request_count;
}
}
if (++_concurrent_copy_request_count > FLAGS_max_concurrent_remote_copy_requests) {
// exceed max_concurrent_remote_copy_requests limit, pause.
// the copy task will be triggered by continue_copy() invoked in end_copy().
--_concurrent_copy_request_count;
break;
}
}
}
void nfs_client_impl::end_copy(::dsn::error_code err,
const copy_response &resp,
const copy_request_ex_ptr &reqc)
{
--_concurrent_copy_request_count;
--reqc->file_ctx->user_req->concurrent_copy_count;
const file_context_ptr &fc = reqc->file_ctx;
if (err == ERR_OK) {
err = resp.error;
}
if (err != ::dsn::ERR_OK) {
METRIC_VAR_INCREMENT(nfs_client_copy_failed_requests);
if (!fc->user_req->is_finished) {
host_port hp;
GET_HOST_PORT(fc->user_req->file_size_req, source, hp);
if (reqc->retry_count > 0) {
LOG_WARNING("[nfs_service] remote copy failed, source = {}, dir = {}, file = {}, "
"err = {}, retry_count = {}",
FMT_HOST_PORT_AND_IP(fc->user_req->file_size_req, source),
fc->user_req->file_size_req.source_dir,
fc->file_name,
err,
reqc->retry_count);
// retry copy
reqc->retry_count--;
// put back into copy request queue
zauto_lock l(_copy_requests_lock);
if (fc->user_req->high_priority)
_copy_requests_high.push_front(reqc);
else
_copy_requests_low.push_retry(reqc);
} else {
LOG_ERROR("[nfs_service] remote copy failed, source = {}, dir = {}, file = {}, "
"err = {}, retry_count = {}",
FMT_HOST_PORT_AND_IP(fc->user_req->file_size_req, source),
fc->user_req->file_size_req.source_dir,
fc->file_name,
err,
reqc->retry_count);
handle_completion(fc->user_req, err);
}
}
}
else {
METRIC_VAR_INCREMENT_BY(nfs_client_copy_bytes, resp.size);
reqc->response = resp;
reqc->is_ready_for_write = true;
// prepare write requests
std::deque<copy_request_ex_ptr> new_writes;
{
zauto_lock l(fc->user_req->user_req_lock);
if (!fc->user_req->is_finished && fc->current_write_index == reqc->index - 1) {
for (int i = reqc->index; i < (int)(fc->copy_requests.size()); i++) {
if (fc->copy_requests[i]->is_ready_for_write) {
fc->current_write_index++;
new_writes.push_back(fc->copy_requests[i]);
} else {
break;
}
}
}
}
// put write requests into queue
if (!new_writes.empty()) {
zauto_lock l(_local_writes_lock);
_local_writes.insert(_local_writes.end(), new_writes.begin(), new_writes.end());
_buffered_local_write_count += new_writes.size();
}
}
continue_copy();
continue_write();
}
void nfs_client_impl::continue_write()
{
// check write quota
if (++_concurrent_local_write_count > FLAGS_max_concurrent_local_writes) {
// exceed max_concurrent_local_writes limit, pause.
// the copy task will be triggered by continue_write() invoked in
// local_write_callback().
--_concurrent_local_write_count;
return;
}
// get write data
copy_request_ex_ptr reqc;
while (true) {
{
zauto_lock l(_local_writes_lock);
if (!_local_writes.empty()) {
reqc = _local_writes.front();
_local_writes.pop_front();
--_buffered_local_write_count;
} else {
// no write data
reqc = nullptr;
break;
}
}
{
// only process valid request, and discard invalid request
zauto_lock l(reqc->lock);
if (reqc->is_valid) {
break;
}
}
}
if (nullptr == reqc) {
--_concurrent_local_write_count;
return;
}
// real write
const file_context_ptr &fc = reqc->file_ctx;
std::string file_path =
dsn::utils::filesystem::path_combine(fc->user_req->file_size_req.dst_dir, fc->file_name);
std::string path = dsn::utils::filesystem::remove_file_name(file_path.c_str());
CHECK(dsn::utils::filesystem::create_directory(path), "create directory {} failed", path);
if (!fc->file_holder->file_handle) {
// double check
zauto_lock l(fc->user_req->user_req_lock);
if (!fc->file_holder->file_handle) {
fc->file_holder->file_handle = file::open(file_path, file::FileOpenType::kWriteOnly);
}
}
if (!fc->file_holder->file_handle) {
--_concurrent_local_write_count;
LOG_ERROR("open file {} failed", file_path);
handle_completion(fc->user_req, ERR_FILE_OPERATION_FAILED);
} else {
LOG_DEBUG("nfs: copy to file {} [{}, {}]",
file_path,
reqc->response.offset,
reqc->response.offset + reqc->response.size);
zauto_lock l(reqc->lock);
if (reqc->is_valid) {
reqc->local_write_task = file::write(fc->file_holder->file_handle,
reqc->response.file_content.data(),
reqc->response.size,
reqc->response.offset,
LPC_NFS_WRITE,
&_tracker,
[=](error_code err, int sz) {
end_write(err, sz, reqc);
// reset task to release memory quickly.
// should do this after local_write_callback()
// done.
{
::dsn::task_ptr tsk;
zauto_lock l(reqc->lock);
tsk = std::move(reqc->local_write_task);
}
});
} else {
--_concurrent_local_write_count;
}
}
}
void nfs_client_impl::end_write(error_code err, size_t sz, const copy_request_ex_ptr &reqc)
{
--_concurrent_local_write_count;
// clear content to release memory quickly
reqc->response.file_content = blob();
const file_context_ptr &fc = reqc->file_ctx;
bool completed = false;
if (err != ERR_OK) {
METRIC_VAR_INCREMENT(nfs_client_failed_writes);
LOG_ERROR("[nfs_service] local write failed, dir = {}, file = {}, err = {}",
fc->user_req->file_size_req.dst_dir,
fc->file_name,
err);
completed = true;
} else {
METRIC_VAR_INCREMENT_BY(nfs_client_write_bytes, sz);
file_wrapper_ptr temp_holder;
zauto_lock l(fc->user_req->user_req_lock);
if (!fc->user_req->is_finished &&
++fc->finished_segments == (int)fc->copy_requests.size()) {
// release file to make it closed immediately after write done.
// we use temp_holder to make file closing out of lock.
temp_holder = std::move(fc->file_holder);
if (++fc->user_req->finished_files == (int)fc->user_req->file_contexts.size()) {
completed = true;
}
}
}
if (completed) {
handle_completion(fc->user_req, err);
}
continue_write();
continue_copy();
}
void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code err)
{
// ATTENTION: only here we may lock for two level (user_req_lock -> copy_request_ex.lock)
zauto_lock l(req->user_req_lock);
// make sure this function can only be executed for once
if (req->is_finished)
return;
req->is_finished = true;
size_t total_size = 0;
for (file_context_ptr &fc : req->file_contexts) {
total_size += fc->file_size;
if (err != ERR_OK) {
// mark all copy_requests to be invalid
for (const copy_request_ex_ptr &rc : fc->copy_requests) {
zauto_lock l(rc->lock);
rc->is_valid = false;
}
}
// clear copy_requests to break circle reference
fc->copy_requests.clear();
}
// clear file_contexts to break circle reference
req->file_contexts.clear();
// notify aio_task
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}
// todo(jiashuo1) just for compatibility with scripts, such as
// https://github.com/apache/incubator-pegasus/blob/v2.3/scripts/pegasus_offline_node_list.sh
void nfs_client_impl::register_cli_commands()
{
static std::once_flag flag;
std::call_once(flag, [&]() {
_nfs_max_copy_rate_megabytes_cmd = dsn::command_manager::instance().register_int_command(
FLAGS_max_copy_rate_megabytes_per_disk,
FLAGS_max_copy_rate_megabytes_per_disk,
"nfs.max_copy_rate_megabytes_per_disk",
fmt::format("{}, "
"should be greater than 'nfs_copy_block_bytes' which is {}",
kMaxCopyRateMegaBytesPerDiskDesc,
FLAGS_nfs_copy_block_bytes),
&check_max_copy_rate_megabytes_per_disk);
});
}
} // namespace service
} // namespace dsn