void replica::on_prepare_reply()

in src/replica/replica_2pc.cpp [637:782]


void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> pr,
                               error_code err,
                               dsn::message_ex *request,
                               dsn::message_ex *reply)
{
    _checker.only_one_thread_access();

    mutation_ptr mu = pr.first;
    partition_status::type target_status = pr.second;

    // skip callback for old mutations
    if (partition_status::PS_PRIMARY != status() || mu->data.header.ballot < get_ballot() ||
        mu->get_decree() <= last_committed_decree())
        return;

    CHECK_EQ_MSG(mu->data.header.ballot, get_ballot(), "{}: invalid mutation ballot", mu->name());

    ::dsn::rpc_address node = request->to_address;
    partition_status::type st = _primary_states.get_node_status(node);

    // handle reply
    prepare_ack resp;

    // handle error
    if (err != ERR_OK) {
        resp.err = err;
    } else {
        ::dsn::unmarshall(reply, resp);
    }

    auto send_prepare_tracer = mu->_tracer->sub_tracer(request->to_address.to_string());
    APPEND_EXTERN_POINT(send_prepare_tracer, resp.receive_timestamp, "remote_receive");
    APPEND_EXTERN_POINT(send_prepare_tracer, resp.response_timestamp, "remote_reply");
    ADD_CUSTOM_POINT(send_prepare_tracer, resp.err.to_string());

    if (resp.err == ERR_OK) {
        LOG_DEBUG_PREFIX("mutation {} on_prepare_reply from {}, appro_data_bytes = {}, "
                         "target_status = {}, err = {}",
                         mu->name(),
                         node,
                         mu->appro_data_bytes(),
                         enum_to_string(target_status),
                         resp.err);
    } else {
        LOG_ERROR_PREFIX("mutation {} on_prepare_reply from {}, appro_data_bytes = {}, "
                         "target_status = {}, err = {}",
                         mu->name(),
                         node,
                         mu->appro_data_bytes(),
                         enum_to_string(target_status),
                         resp.err);
    }

    if (resp.err == ERR_OK) {
        CHECK_EQ(resp.ballot, get_ballot());
        CHECK_EQ(resp.decree, mu->data.header.decree);

        switch (target_status) {
        case partition_status::PS_SECONDARY:
            CHECK(_primary_states.check_exist(node, partition_status::PS_SECONDARY),
                  "invalid secondary node address, address = {}",
                  node);
            CHECK_GT(mu->left_secondary_ack_count(), 0);
            if (0 == mu->decrease_left_secondary_ack_count()) {
                do_possible_commit_on_primary(mu);
            }
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            CHECK_GT(mu->left_potential_secondary_ack_count(), 0);
            if (0 == mu->decrease_left_potential_secondary_ack_count()) {
                do_possible_commit_on_primary(mu);
            }
            break;
        default:
            LOG_WARNING_PREFIX("mutation {} prepare ack skipped coz the node is now inactive",
                               mu->name());
            break;
        }
    }

    // failure handling
    else {
        // retry for INACTIVE or TRY_AGAIN if there is still time.
        if (resp.err == ERR_INACTIVE_STATE || resp.err == ERR_TRY_AGAIN) {
            int prepare_timeout_ms = (target_status == partition_status::PS_SECONDARY
                                          ? FLAGS_prepare_timeout_ms_for_secondaries
                                          : FLAGS_prepare_timeout_ms_for_potential_secondaries);
            int delay_time_ms = 5; // delay some time before retry to avoid sending too frequently
            if (mu->is_prepare_close_to_timeout(delay_time_ms + 2, prepare_timeout_ms)) {
                LOG_ERROR_PREFIX("mutation {} do not retry prepare to {} for no enought time left, "
                                 "prepare_ts_ms = {}, prepare_timeout_ms = {}, now_ms = {}",
                                 mu->name(),
                                 node,
                                 mu->prepare_ts_ms(),
                                 prepare_timeout_ms,
                                 dsn_now_ms());
            } else {
                LOG_INFO_PREFIX(
                    "mutation {} retry prepare to {} after {} ms", mu->name(), node, delay_time_ms);
                int64_t learn_signature = invalid_signature;
                if (target_status == partition_status::PS_POTENTIAL_SECONDARY) {
                    auto it = _primary_states.learners.find(node);
                    if (it != _primary_states.learners.end()) {
                        learn_signature = it->second.signature;
                    }
                }
                tasking::enqueue(
                    LPC_DELAY_PREPARE,
                    &_tracker,
                    [this, node, target_status, mu, prepare_timeout_ms, learn_signature] {
                        // need to check status/ballot/decree before sending prepare message,
                        // because the config may have been changed or the mutation may have been
                        // committed during the delay time.
                        if (status() == partition_status::PS_PRIMARY &&
                            get_ballot() == mu->data.header.ballot &&
                            mu->get_decree() > last_committed_decree()) {
                            send_prepare_message(node,
                                                 target_status,
                                                 mu,
                                                 prepare_timeout_ms,
                                                 false,
                                                 learn_signature);
                        }
                    },
                    get_gpid().thread_hash(),
                    std::chrono::milliseconds(delay_time_ms));
                return;
            }
        }

        _stub->_counter_replicas_recent_prepare_fail_count->increment();

        // make sure this is before any later commit ops
        // because now commit ops may lead to new prepare ops
        // due to replication throttling
        handle_remote_failure(st, node, resp.err, "prepare");

        // note targetStatus and (curent) status may diff
        if (target_status == partition_status::PS_POTENTIAL_SECONDARY) {
            CHECK_GT(mu->left_potential_secondary_ack_count(), 0);
            if (0 == mu->decrease_left_potential_secondary_ack_count()) {
                do_possible_commit_on_primary(mu);
            }
        }
    }
}