in src/replica/replica_2pc.cpp [521:663]
void replica::on_prepare(dsn::message_ex *request)
{
_checker.only_one_thread_access();
replica_configuration rconfig;
mutation_ptr mu;
bool pop_all_committed_mutations = false;
{
rpc_read_stream reader(request);
unmarshall(reader, rconfig, DSF_THRIFT_BINARY);
mu = mutation::read_from(reader, request);
mu->set_is_sync_to_child(rconfig.split_sync_to_child);
pop_all_committed_mutations = rconfig.pop_all;
rconfig.split_sync_to_child = false;
rconfig.pop_all = false;
}
decree decree = mu->data.header.decree;
LOG_DEBUG_PREFIX("mutation {} on_prepare", mu->name());
mu->_tracer->set_name(fmt::format("mutation[{}]", mu->name()));
mu->_tracer->set_description("secondary");
ADD_POINT(mu->_tracer);
CHECK_EQ(mu->data.header.pid, rconfig.pid);
CHECK_EQ(mu->data.header.ballot, rconfig.ballot);
if (mu->data.header.ballot < get_ballot()) {
LOG_ERROR_PREFIX("mutation {} on_prepare skipped due to old view", mu->name());
// no need response because the rpc should have been cancelled on primary in this case
return;
}
// update configuration when necessary
else if (rconfig.ballot > get_ballot()) {
if (!update_local_configuration(rconfig)) {
LOG_ERROR_PREFIX(
"mutation {} on_prepare failed as update local configuration failed, state = {}",
mu->name(),
enum_to_string(status()));
ack_prepare_message(ERR_INVALID_STATE, mu);
return;
}
}
if (partition_status::PS_INACTIVE == status() || partition_status::PS_ERROR == status()) {
LOG_ERROR_PREFIX("mutation {} on_prepare failed as invalid replica state, state = {}",
mu->name(),
enum_to_string(status()));
ack_prepare_message((partition_status::PS_INACTIVE == status() && _inactive_is_transient)
? ERR_INACTIVE_STATE
: ERR_INVALID_STATE,
mu);
return;
} else if (partition_status::PS_POTENTIAL_SECONDARY == status()) {
// new learning process
if (rconfig.learner_signature != _potential_secondary_states.learning_version) {
LOG_ERROR_PREFIX("mutation {} on_prepare failed as unmatched learning signature, state "
"= {}, old_signature[{:#018x}] vs new_signature[{:#018x}]",
mu->name(),
enum_to_string(status()),
_potential_secondary_states.learning_version,
rconfig.learner_signature);
handle_learning_error(ERR_INVALID_STATE, false);
ack_prepare_message(ERR_INVALID_STATE, mu);
return;
}
auto learning_status = _potential_secondary_states.learning_status;
if (learning_status != learner_status::LearningWithPrepare &&
learning_status != learner_status::LearningSucceeded) {
// if prepare requests are received when learning status is changing from
// LearningWithoutPrepare to LearningWithPrepare, we ack ERR_TRY_AGAIN.
error_code ack_code =
(learning_status == learner_status::LearningWithoutPrepare ? ERR_TRY_AGAIN
: ERR_INVALID_STATE);
LOG_ERROR_PREFIX(
"mutation {} on_prepare skipped as invalid learning status, state = {}, "
"learning_status = {}, ack {}",
mu->name(),
enum_to_string(status()),
enum_to_string(learning_status),
ack_code);
ack_prepare_message(ack_code, mu);
return;
}
}
CHECK_EQ(rconfig.status, status());
if (decree <= last_committed_decree()) {
ack_prepare_message(ERR_OK, mu);
return;
}
// real prepare start
_uniq_timestamp_us.try_update(mu->data.header.timestamp);
auto mu2 = _prepare_list->get_mutation_by_decree(decree);
if (mu2 != nullptr && mu2->data.header.ballot == mu->data.header.ballot) {
if (mu2->is_logged()) {
// already logged, just response ERR_OK
ack_prepare_message(ERR_OK, mu);
} else {
// not logged, combine duplicate request to old mutation
mu2->add_prepare_request(request);
}
return;
}
error_code err = _prepare_list->prepare(mu, status(), pop_all_committed_mutations);
CHECK_EQ_MSG(err, ERR_OK, "prepare mutation failed");
if (partition_status::PS_POTENTIAL_SECONDARY == status() ||
partition_status::PS_SECONDARY == status()) {
CHECK_LE_MSG(mu->data.header.decree,
last_committed_decree() + FLAGS_max_mutation_count_in_prepare_list,
"last_committed_decree: {}, FLAGS_max_mutation_count_in_prepare_list: {}",
last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list);
} else {
LOG_ERROR_PREFIX("mutation {} on_prepare failed as invalid replica state, state = {}",
mu->name(),
enum_to_string(status()));
ack_prepare_message(ERR_INVALID_STATE, mu);
return;
}
if (_split_mgr->is_splitting()) {
_split_mgr->copy_mutation(mu);
}
CHECK(mu->log_task() == nullptr, "");
mu->log_task() = _private_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
&_tracker,
std::bind(&replica::on_append_log_completed,
this,
mu,
std::placeholders::_1,
std::placeholders::_2),
get_gpid().thread_hash());
CHECK_NOTNULL(mu->log_task(), "");
}