in src/replica/replica_2pc.cpp [637:782]
void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> pr,
error_code err,
dsn::message_ex *request,
dsn::message_ex *reply)
{
_checker.only_one_thread_access();
mutation_ptr mu = pr.first;
partition_status::type target_status = pr.second;
// skip callback for old mutations
if (partition_status::PS_PRIMARY != status() || mu->data.header.ballot < get_ballot() ||
mu->get_decree() <= last_committed_decree())
return;
CHECK_EQ_MSG(mu->data.header.ballot, get_ballot(), "{}: invalid mutation ballot", mu->name());
::dsn::rpc_address node = request->to_address;
partition_status::type st = _primary_states.get_node_status(node);
// handle reply
prepare_ack resp;
// handle error
if (err != ERR_OK) {
resp.err = err;
} else {
::dsn::unmarshall(reply, resp);
}
auto send_prepare_tracer = mu->_tracer->sub_tracer(request->to_address.to_string());
APPEND_EXTERN_POINT(send_prepare_tracer, resp.receive_timestamp, "remote_receive");
APPEND_EXTERN_POINT(send_prepare_tracer, resp.response_timestamp, "remote_reply");
ADD_CUSTOM_POINT(send_prepare_tracer, resp.err.to_string());
if (resp.err == ERR_OK) {
LOG_DEBUG_PREFIX("mutation {} on_prepare_reply from {}, appro_data_bytes = {}, "
"target_status = {}, err = {}",
mu->name(),
node,
mu->appro_data_bytes(),
enum_to_string(target_status),
resp.err);
} else {
LOG_ERROR_PREFIX("mutation {} on_prepare_reply from {}, appro_data_bytes = {}, "
"target_status = {}, err = {}",
mu->name(),
node,
mu->appro_data_bytes(),
enum_to_string(target_status),
resp.err);
}
if (resp.err == ERR_OK) {
CHECK_EQ(resp.ballot, get_ballot());
CHECK_EQ(resp.decree, mu->data.header.decree);
switch (target_status) {
case partition_status::PS_SECONDARY:
CHECK(_primary_states.check_exist(node, partition_status::PS_SECONDARY),
"invalid secondary node address, address = {}",
node);
CHECK_GT(mu->left_secondary_ack_count(), 0);
if (0 == mu->decrease_left_secondary_ack_count()) {
do_possible_commit_on_primary(mu);
}
break;
case partition_status::PS_POTENTIAL_SECONDARY:
CHECK_GT(mu->left_potential_secondary_ack_count(), 0);
if (0 == mu->decrease_left_potential_secondary_ack_count()) {
do_possible_commit_on_primary(mu);
}
break;
default:
LOG_WARNING_PREFIX("mutation {} prepare ack skipped coz the node is now inactive",
mu->name());
break;
}
}
// failure handling
else {
// retry for INACTIVE or TRY_AGAIN if there is still time.
if (resp.err == ERR_INACTIVE_STATE || resp.err == ERR_TRY_AGAIN) {
int prepare_timeout_ms = (target_status == partition_status::PS_SECONDARY
? FLAGS_prepare_timeout_ms_for_secondaries
: FLAGS_prepare_timeout_ms_for_potential_secondaries);
int delay_time_ms = 5; // delay some time before retry to avoid sending too frequently
if (mu->is_prepare_close_to_timeout(delay_time_ms + 2, prepare_timeout_ms)) {
LOG_ERROR_PREFIX("mutation {} do not retry prepare to {} for no enought time left, "
"prepare_ts_ms = {}, prepare_timeout_ms = {}, now_ms = {}",
mu->name(),
node,
mu->prepare_ts_ms(),
prepare_timeout_ms,
dsn_now_ms());
} else {
LOG_INFO_PREFIX(
"mutation {} retry prepare to {} after {} ms", mu->name(), node, delay_time_ms);
int64_t learn_signature = invalid_signature;
if (target_status == partition_status::PS_POTENTIAL_SECONDARY) {
auto it = _primary_states.learners.find(node);
if (it != _primary_states.learners.end()) {
learn_signature = it->second.signature;
}
}
tasking::enqueue(
LPC_DELAY_PREPARE,
&_tracker,
[this, node, target_status, mu, prepare_timeout_ms, learn_signature] {
// need to check status/ballot/decree before sending prepare message,
// because the config may have been changed or the mutation may have been
// committed during the delay time.
if (status() == partition_status::PS_PRIMARY &&
get_ballot() == mu->data.header.ballot &&
mu->get_decree() > last_committed_decree()) {
send_prepare_message(node,
target_status,
mu,
prepare_timeout_ms,
false,
learn_signature);
}
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_time_ms));
return;
}
}
_stub->_counter_replicas_recent_prepare_fail_count->increment();
// make sure this is before any later commit ops
// because now commit ops may lead to new prepare ops
// due to replication throttling
handle_remote_failure(st, node, resp.err, "prepare");
// note targetStatus and (curent) status may diff
if (target_status == partition_status::PS_POTENTIAL_SECONDARY) {
CHECK_GT(mu->left_potential_secondary_ack_count(), 0);
if (0 == mu->decrease_left_potential_secondary_ack_count()) {
do_possible_commit_on_primary(mu);
}
}
}
}