in src/replica/replica_2pc.cpp [237:385]
void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_committed_mutations)
{
CHECK_EQ(partition_status::PS_PRIMARY, status());
mu->_tracer->set_description("primary");
ADD_POINT(mu->_tracer);
error_code err = ERR_OK;
uint8_t count = 0;
const auto request_count = mu->client_requests.size();
mu->data.header.last_committed_decree = last_committed_decree();
dsn_log_level_t level = LOG_LEVEL_DEBUG;
if (mu->data.header.decree == invalid_decree) {
mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
// print a debug log if necessary
if (FLAGS_prepare_decree_gap_for_debug_logging > 0 &&
mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0)
level = LOG_LEVEL_INFO;
mu->set_timestamp(_uniq_timestamp_us.next());
} else {
mu->set_id(get_ballot(), mu->data.header.decree);
}
mu->_tracer->set_name(fmt::format("mutation[{}]", mu->name()));
dlog(level,
"%s: mutation %s init_prepare, mutation_tid=%" PRIu64,
name(),
mu->name(),
mu->tid());
// child should prepare mutation synchronously
mu->set_is_sync_to_child(_primary_states.sync_send_write_request);
// check bounded staleness
if (mu->data.header.decree > last_committed_decree() + FLAGS_staleness_for_commit) {
err = ERR_CAPACITY_EXCEEDED;
goto ErrOut;
}
// stop prepare bulk load ingestion if there are secondaries unalive
for (auto i = 0; i < request_count; ++i) {
const mutation_update &update = mu->data.updates[i];
if (update.code != dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
break;
}
LOG_INFO_PREFIX("try to prepare bulk load mutation({})", mu->name());
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_primary_states.membership.max_replica_count) {
err = ERR_NOT_ENOUGH_MEMBER;
break;
}
}
if (err != ERR_OK) {
goto ErrOut;
}
// stop prepare if there are too few replicas unless it's a reconciliation
// for reconciliation, we should ensure every prepared mutation to be committed
// please refer to PacificA paper
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) &&
!reconciliation) {
err = ERR_NOT_ENOUGH_MEMBER;
goto ErrOut;
}
CHECK_GT(mu->data.header.decree, last_committed_decree());
// local prepare
err = _prepare_list->prepare(mu, partition_status::PS_PRIMARY, pop_all_committed_mutations);
if (err != ERR_OK) {
goto ErrOut;
}
// remote prepare
mu->set_prepare_ts();
mu->set_left_secondary_ack_count((unsigned int)_primary_states.membership.secondaries.size());
for (auto it = _primary_states.membership.secondaries.begin();
it != _primary_states.membership.secondaries.end();
++it) {
send_prepare_message(*it,
partition_status::PS_SECONDARY,
mu,
FLAGS_prepare_timeout_ms_for_secondaries,
pop_all_committed_mutations);
}
count = 0;
for (auto it = _primary_states.learners.begin(); it != _primary_states.learners.end(); ++it) {
if (it->second.prepare_start_decree != invalid_decree &&
mu->data.header.decree >= it->second.prepare_start_decree) {
send_prepare_message(it->first,
partition_status::PS_POTENTIAL_SECONDARY,
mu,
FLAGS_prepare_timeout_ms_for_potential_secondaries,
pop_all_committed_mutations,
it->second.signature);
count++;
}
}
mu->set_left_potential_secondary_ack_count(count);
if (_split_mgr->is_splitting()) {
_split_mgr->copy_mutation(mu);
}
if (mu->is_logged()) {
do_possible_commit_on_primary(mu);
} else {
CHECK_EQ(mu->data.header.log_offset, invalid_offset);
CHECK(mu->log_task() == nullptr, "");
int64_t pending_size;
mu->log_task() = _private_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
&_tracker,
std::bind(&replica::on_append_log_completed,
this,
mu,
std::placeholders::_1,
std::placeholders::_2),
get_gpid().thread_hash(),
&pending_size);
CHECK_NOTNULL(mu->log_task(), "");
if (FLAGS_log_shared_pending_size_throttling_threshold_kb > 0 &&
FLAGS_log_shared_pending_size_throttling_delay_ms > 0 &&
pending_size >= FLAGS_log_shared_pending_size_throttling_threshold_kb * 1024) {
int delay_ms = FLAGS_log_shared_pending_size_throttling_delay_ms;
for (dsn::message_ex *r : mu->client_requests) {
if (r && r->io_session->delay_recv(delay_ms)) {
LOG_WARNING("too large pending shared log ({}), delay traffic from {} for {} "
"milliseconds",
pending_size,
r->header->from_address,
delay_ms);
}
}
}
}
_primary_states.last_prepare_ts_ms = mu->prepare_ts_ms();
return;
ErrOut:
for (auto &r : mu->client_requests) {
response_client_write(r, err);
}
return;
}