void replica::on_client_write()

in src/replica/replica_2pc.cpp [120:235]


void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
    _checker.only_one_thread_access();

    if (!_access_controller->allowed(request, ranger::access_type::kWrite)) {
        response_client_write(request, ERR_ACL_DENY);
        return;
    }

    if (_deny_client.write) {
        if (_deny_client.reconfig) {
            // return ERR_INVALID_STATE will trigger client update config immediately
            response_client_write(request, ERR_INVALID_STATE);
            return;
        }
        // Do not reply any message to the peer client to let it timeout, it's OK coz some users
        // may retry immediately when they got a not success code which will make the server side
        // pressure more and more heavy.
        return;
    }

    if (dsn_unlikely(FLAGS_max_allowed_write_size &&
                     request->body_size() > FLAGS_max_allowed_write_size)) {
        std::string request_info = _app->dump_write_request(request);
        LOG_WARNING_PREFIX(
            "client from {} write request body size exceed threshold, request = [{}], "
            "request_body_size "
            "= {}, FLAGS_max_allowed_write_size = {}, it will be rejected!",
            request->header->from_address.to_string(),
            request_info,
            request->body_size(),
            FLAGS_max_allowed_write_size);
        _stub->_counter_recent_write_size_exceed_threshold_count->increment();
        response_client_write(request, ERR_INVALID_DATA);
        return;
    }

    task_spec *spec = task_spec::get(request->rpc_code());
    if (dsn_unlikely(nullptr == spec || request->rpc_code() == TASK_CODE_INVALID)) {
        LOG_ERROR("recv message with unhandled rpc name {} from {}, trace_id = {}",
                  request->rpc_code().to_string(),
                  request->header->from_address.to_string(),
                  request->header->trace_id);
        response_client_write(request, ERR_HANDLER_NOT_FOUND);
        return;
    }

    if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
        // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
        // make this write produce the same result on multiple clusters.
        _counter_dup_disabled_non_idempotent_write_count->increment();
        response_client_write(request, ERR_OPERATION_DISABLED);
        return;
    }

    CHECK_REQUEST_IF_SPLITTING(write)

    if (partition_status::PS_PRIMARY != status()) {
        response_client_write(request, ERR_INVALID_STATE);
        return;
    }

    if (FLAGS_reject_write_when_disk_insufficient &&
        (_dir_node->status != disk_status::NORMAL || _primary_states.secondary_disk_abnormal())) {
        response_client_write(request, disk_status_to_error_code(_dir_node->status));
        return;
    }

    if (_is_bulk_load_ingestion) {
        if (request->rpc_code() != dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
            // reject write requests during ingestion
            _counter_recent_write_bulk_load_ingestion_reject_count->increment();
            response_client_write(request, ERR_OPERATION_DISABLED);
        } else {
            response_client_write(request, ERR_NO_NEED_OPERATE);
        }
        return;
    }

    if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
        auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status();
        if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED &&
            cur_bulk_load_status != bulk_load_status::BLS_INGESTING) {
            LOG_ERROR_PREFIX("receive bulk load ingestion request with wrong status({})",
                             enum_to_string(cur_bulk_load_status));
            response_client_write(request, ERR_INVALID_STATE);
            return;
        }
        LOG_INFO_PREFIX("receive bulk load ingestion request");

        // bulk load ingestion request requires that all secondaries should be alive
        if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
            _primary_states.membership.max_replica_count) {
            response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
            return;
        }
        _is_bulk_load_ingestion = true;
        _bulk_load_ingestion_start_time_ms = dsn_now_ms();
    }

    if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
        _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
        response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
        return;
    }

    if (!ignore_throttling && throttle_write_request(request)) {
        return;
    }

    LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address);
    auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this);
    if (mu) {
        init_prepare(mu, false);
    }
}