in src/replica/replica_config.cpp [663:1030]
bool replica::update_local_configuration(const replica_configuration &config,
bool same_ballot /* = false*/)
{
FAIL_POINT_INJECT_F("replica_update_local_configuration", [=](dsn::string_view) -> bool {
auto old_status = status();
_config = config;
LOG_INFO_PREFIX(
"update status from {} to {}", enum_to_string(old_status), enum_to_string(status()));
return true;
});
CHECK(config.ballot > get_ballot() || (same_ballot && config.ballot == get_ballot()),
"invalid ballot, {} VS {}",
config.ballot,
get_ballot());
CHECK_EQ(config.pid, get_gpid());
partition_status::type old_status = status();
ballot old_ballot = get_ballot();
// skip unnecessary configuration change
if (old_status == config.status && old_ballot == config.ballot) {
return true;
}
// skip invalid change
// but do not disable transitions to partition_status::PS_ERROR as errors
// must be handled immediately
switch (old_status) {
case partition_status::PS_ERROR: {
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
enum_to_string(old_status),
old_ballot,
enum_to_string(config.status),
config.ballot);
return false;
} break;
case partition_status::PS_INACTIVE:
if ((config.status == partition_status::PS_PRIMARY ||
config.status == partition_status::PS_SECONDARY) &&
!_inactive_is_transient) {
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed when "
"inactive state is not transient",
enum_to_string(old_status),
old_ballot,
enum_to_string(config.status),
config.ballot);
return false;
}
break;
case partition_status::PS_POTENTIAL_SECONDARY:
if (config.status == partition_status::PS_INACTIVE) {
if (!_potential_secondary_states.cleanup(false)) {
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed coz "
"learning remote state is still running",
enum_to_string(old_status),
old_ballot,
enum_to_string(config.status),
config.ballot);
return false;
}
}
break;
case partition_status::PS_SECONDARY:
if (config.status != partition_status::PS_SECONDARY &&
config.status != partition_status::PS_ERROR) {
if (!_secondary_states.cleanup(false)) {
// TODO(sunweijie): totally remove this
dsn::task *native_handle;
if (_secondary_states.checkpoint_task)
native_handle = _secondary_states.checkpoint_task.get();
else if (_secondary_states.checkpoint_completed_task)
native_handle = _secondary_states.checkpoint_completed_task.get();
else if (_secondary_states.catchup_with_private_log_task)
native_handle = _secondary_states.catchup_with_private_log_task.get();
else
native_handle = nullptr;
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed coz "
"checkpointing {} is still running",
enum_to_string(old_status),
old_ballot,
enum_to_string(config.status),
config.ballot,
fmt::ptr(native_handle));
return false;
}
}
break;
case partition_status::PS_PARTITION_SPLIT:
if (config.status == partition_status::PS_INACTIVE) {
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
enum_to_string(old_status),
old_ballot,
enum_to_string(config.status),
config.ballot);
return false;
}
break;
default:
break;
}
uint64_t old_ts = _last_config_change_time_ms;
_config = config;
// we should durable the new ballot to prevent the inconsistent state
if (_config.ballot > old_ballot) {
dsn::error_code result = _app->update_init_info_ballot_and_decree(this);
if (result == dsn::ERR_OK) {
LOG_INFO_PREFIX(
"update ballot to init file from {} to {} OK", old_ballot, _config.ballot);
} else {
LOG_WARNING_PREFIX(
"update ballot to init file from {} to {} {}", old_ballot, _config.ballot, result);
}
_split_mgr->parent_cleanup_split_context();
}
_last_config_change_time_ms = dsn_now_ms();
CHECK_GE(max_prepared_decree(), last_committed_decree());
_bulk_loader->clear_bulk_load_states_if_needed(old_status, config.status);
// Notice: there has five ways that primary can change its partition_status
// 1, primary change partition config, such as add/remove secondary
// 2, downgrage to secondary because of load balance
// 3, disnconnected with meta-server
// 4, connectied with meta-server
// 5, crash
// here, we just need to care about case 1, 2, 3 and 4, ignore case 5
// the way that partition status change is:
// case 1: primary -> ps_inactive & _inactive_is_transient = true -> primary
// case 2: primary -> ps_inavtive & _inactive_is_transient = true -> secondary
// case 3: primary -> ps_inactive & _inactive_is_transient = ture
// case 4: ps_inactive & _inactive_is_transient = true -> primary or secondary
// the way we process whether primary stop uploading backup checkpoint is that case-1 continue
// uploading, others just stop uploading
switch (old_status) {
case partition_status::PS_PRIMARY:
cleanup_preparing_mutations(false);
switch (config.status) {
case partition_status::PS_PRIMARY:
replay_prepare_list();
break;
case partition_status::PS_INACTIVE:
_primary_states.cleanup(old_ballot != config.ballot);
// here we use wheather ballot changes and wheather disconnecting with meta to
// distinguish different case above mentioned
if (old_ballot == config.ballot && _stub->is_connected()) {
// case 1 and case 2, just continue uploading
//(when case2, we stop uploading when it change to secondary)
} else {
set_backup_context_cancel();
clear_cold_backup_state();
}
break;
case partition_status::PS_SECONDARY:
case partition_status::PS_ERROR:
_primary_states.cleanup(true);
// only load balance will occur primary -> secondary
// and we just stop upload and release the cold_backup_state, and let new primary to
// upload
set_backup_context_cancel();
clear_cold_backup_state();
break;
case partition_status::PS_POTENTIAL_SECONDARY:
CHECK(false, "invalid execution path");
break;
default:
CHECK(false, "invalid execution path");
}
break;
case partition_status::PS_SECONDARY:
cleanup_preparing_mutations(false);
if (config.status != partition_status::PS_SECONDARY) {
// if primary change the ballot, secondary will update ballot from A to
// A+1, we don't need clear cold backup context when this case
//
// if secondary upgrade to primary, we must cancel & clear cold_backup_state, because
// new-primary must check whether backup is already completed by previous-primary
set_backup_context_cancel();
clear_cold_backup_state();
}
switch (config.status) {
case partition_status::PS_PRIMARY:
init_group_check();
replay_prepare_list();
break;
case partition_status::PS_SECONDARY:
break;
case partition_status::PS_POTENTIAL_SECONDARY:
// prevent further 2pc
// wait next group check or explicit learn for real learning
_potential_secondary_states.learning_status = learner_status::LearningWithoutPrepare;
break;
case partition_status::PS_INACTIVE:
break;
case partition_status::PS_ERROR:
// _secondary_states.cleanup(true); => do it in close as it may block
break;
default:
CHECK(false, "invalid execution path");
}
break;
case partition_status::PS_POTENTIAL_SECONDARY:
switch (config.status) {
case partition_status::PS_PRIMARY:
CHECK(false, "invalid execution path");
break;
case partition_status::PS_SECONDARY:
_prepare_list->truncate(_app->last_committed_decree());
// using force cleanup now as all tasks must be done already
CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
"potential secondary context cleanup failed");
check_state_completeness();
break;
case partition_status::PS_POTENTIAL_SECONDARY:
break;
case partition_status::PS_INACTIVE:
break;
case partition_status::PS_ERROR:
_prepare_list->reset(_app->last_committed_decree());
_potential_secondary_states.cleanup(false);
// => do this in close as it may block
// CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
// "potential secondary context cleanup failed");
break;
default:
CHECK(false, "invalid execution path");
}
break;
case partition_status::PS_PARTITION_SPLIT:
switch (config.status) {
case partition_status::PS_PRIMARY:
_split_states.cleanup(true);
init_group_check();
replay_prepare_list();
break;
case partition_status::PS_SECONDARY:
_split_states.cleanup(true);
break;
case partition_status::PS_POTENTIAL_SECONDARY:
CHECK(false, "invalid execution path");
break;
case partition_status::PS_INACTIVE:
break;
case partition_status::PS_ERROR:
_split_states.cleanup(false);
break;
default:
CHECK(false, "invalid execution path");
}
break;
case partition_status::PS_INACTIVE:
if (config.status != partition_status::PS_PRIMARY || !_inactive_is_transient) {
// except for case 1, we need stop uploading backup checkpoint
set_backup_context_cancel();
clear_cold_backup_state();
}
switch (config.status) {
case partition_status::PS_PRIMARY:
CHECK(_inactive_is_transient, "must be in transient state for being primary next");
_inactive_is_transient = false;
init_group_check();
replay_prepare_list();
break;
case partition_status::PS_SECONDARY:
CHECK(_inactive_is_transient, "must be in transient state for being secondary next");
_inactive_is_transient = false;
break;
case partition_status::PS_POTENTIAL_SECONDARY:
_inactive_is_transient = false;
break;
case partition_status::PS_INACTIVE:
break;
case partition_status::PS_ERROR:
// => do this in close as it may block
// if (_inactive_is_transient)
// {
// _secondary_states.cleanup(true);
// }
if (_inactive_is_transient) {
_primary_states.cleanup(true);
_secondary_states.cleanup(false);
}
_inactive_is_transient = false;
break;
default:
CHECK(false, "invalid execution path");
}
break;
case partition_status::PS_ERROR:
switch (config.status) {
case partition_status::PS_PRIMARY:
CHECK(false, "invalid execution path");
break;
case partition_status::PS_SECONDARY:
CHECK(false, "invalid execution path");
break;
case partition_status::PS_POTENTIAL_SECONDARY:
CHECK(false, "invalid execution path");
break;
case partition_status::PS_INACTIVE:
CHECK(false, "invalid execution path");
break;
case partition_status::PS_ERROR:
break;
default:
CHECK(false, "invalid execution path");
}
break;
default:
CHECK(false, "invalid execution path");
}
LOG_INFO_PREFIX(
"status change {} @ {} => {} @ {}, pre({}, {}), app({}, {}), duration = {} ms, {}",
enum_to_string(old_status),
old_ballot,
enum_to_string(status()),
get_ballot(),
_prepare_list->max_decree(),
_prepare_list->last_committed_decree(),
_app->last_committed_decree(),
_app->last_durable_decree(),
_last_config_change_time_ms - old_ts,
boost::lexical_cast<std::string>(_config));
if (status() != old_status) {
bool is_closing =
(status() == partition_status::PS_ERROR ||
(status() == partition_status::PS_INACTIVE && get_ballot() > old_ballot));
_stub->notify_replica_state_update(config, is_closing);
if (is_closing) {
LOG_INFO_PREFIX("being close ...");
_stub->begin_close_replica(this);
return false;
}
} else {
_stub->notify_replica_state_update(config, false);
}
// start pending mutations if necessary
if (status() == partition_status::PS_PRIMARY) {
mutation_ptr next = _primary_states.write_queue.check_possible_work(
static_cast<int>(_prepare_list->max_decree() - last_committed_decree()));
if (next) {
init_prepare(next, false);
}
if (_primary_states.membership.secondaries.size() + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
std::vector<mutation_ptr> queued;
_primary_states.write_queue.clear(queued);
for (auto &m : queued) {
for (auto &r : m->client_requests) {
response_client_write(r, ERR_NOT_ENOUGH_MEMBER);
}
}
}
}
return true;
}