in src/replica/replica_2pc.cpp [120:235]
void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();
if (!_access_controller->allowed(request, ranger::access_type::kWrite)) {
response_client_write(request, ERR_ACL_DENY);
return;
}
if (_deny_client.write) {
if (_deny_client.reconfig) {
// return ERR_INVALID_STATE will trigger client update config immediately
response_client_write(request, ERR_INVALID_STATE);
return;
}
// Do not reply any message to the peer client to let it timeout, it's OK coz some users
// may retry immediately when they got a not success code which will make the server side
// pressure more and more heavy.
return;
}
if (dsn_unlikely(FLAGS_max_allowed_write_size &&
request->body_size() > FLAGS_max_allowed_write_size)) {
std::string request_info = _app->dump_write_request(request);
LOG_WARNING_PREFIX(
"client from {} write request body size exceed threshold, request = [{}], "
"request_body_size "
"= {}, FLAGS_max_allowed_write_size = {}, it will be rejected!",
request->header->from_address.to_string(),
request_info,
request->body_size(),
FLAGS_max_allowed_write_size);
_stub->_counter_recent_write_size_exceed_threshold_count->increment();
response_client_write(request, ERR_INVALID_DATA);
return;
}
task_spec *spec = task_spec::get(request->rpc_code());
if (dsn_unlikely(nullptr == spec || request->rpc_code() == TASK_CODE_INVALID)) {
LOG_ERROR("recv message with unhandled rpc name {} from {}, trace_id = {}",
request->rpc_code().to_string(),
request->header->from_address.to_string(),
request->header->trace_id);
response_client_write(request, ERR_HANDLER_NOT_FOUND);
return;
}
if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
// Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
// make this write produce the same result on multiple clusters.
_counter_dup_disabled_non_idempotent_write_count->increment();
response_client_write(request, ERR_OPERATION_DISABLED);
return;
}
CHECK_REQUEST_IF_SPLITTING(write)
if (partition_status::PS_PRIMARY != status()) {
response_client_write(request, ERR_INVALID_STATE);
return;
}
if (FLAGS_reject_write_when_disk_insufficient &&
(_dir_node->status != disk_status::NORMAL || _primary_states.secondary_disk_abnormal())) {
response_client_write(request, disk_status_to_error_code(_dir_node->status));
return;
}
if (_is_bulk_load_ingestion) {
if (request->rpc_code() != dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
// reject write requests during ingestion
_counter_recent_write_bulk_load_ingestion_reject_count->increment();
response_client_write(request, ERR_OPERATION_DISABLED);
} else {
response_client_write(request, ERR_NO_NEED_OPERATE);
}
return;
}
if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status();
if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED &&
cur_bulk_load_status != bulk_load_status::BLS_INGESTING) {
LOG_ERROR_PREFIX("receive bulk load ingestion request with wrong status({})",
enum_to_string(cur_bulk_load_status));
response_client_write(request, ERR_INVALID_STATE);
return;
}
LOG_INFO_PREFIX("receive bulk load ingestion request");
// bulk load ingestion request requires that all secondaries should be alive
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_primary_states.membership.max_replica_count) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}
_is_bulk_load_ingestion = true;
_bulk_load_ingestion_start_time_ms = dsn_now_ms();
}
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}
if (!ignore_throttling && throttle_write_request(request)) {
return;
}
LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address);
auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this);
if (mu) {
init_prepare(mu, false);
}
}