bool replica::update_local_configuration()

in src/replica/replica_config.cpp [687:1055]


bool replica::update_local_configuration(const replica_configuration &config,
                                         bool same_ballot /* = false*/)
{
    FAIL_POINT_INJECT_F("replica_update_local_configuration", [=](std::string_view) -> bool {
        auto old_status = status();
        _config = config;
        LOG_INFO_PREFIX(
            "update status from {} to {}", enum_to_string(old_status), enum_to_string(status()));
        return true;
    });

    CHECK(config.ballot > get_ballot() || (same_ballot && config.ballot == get_ballot()),
          "invalid ballot, {} VS {}",
          config.ballot,
          get_ballot());
    CHECK_EQ(config.pid, get_gpid());

    partition_status::type old_status = status();
    ballot old_ballot = get_ballot();

    // skip unnecessary configuration change
    if (old_status == config.status && old_ballot == config.ballot) {
        return true;
    }

    // skip invalid change
    // but do not disable transitions to partition_status::PS_ERROR as errors
    // must be handled immediately
    switch (old_status) {
    case partition_status::PS_ERROR: {
        LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
                           enum_to_string(old_status),
                           old_ballot,
                           enum_to_string(config.status),
                           config.ballot);
        return false;
    } break;
    case partition_status::PS_INACTIVE:
        if ((config.status == partition_status::PS_PRIMARY ||
             config.status == partition_status::PS_SECONDARY) &&
            !_inactive_is_transient) {
            LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed when "
                               "inactive state is not transient",
                               enum_to_string(old_status),
                               old_ballot,
                               enum_to_string(config.status),
                               config.ballot);
            return false;
        }
        break;
    case partition_status::PS_POTENTIAL_SECONDARY:
        if (config.status == partition_status::PS_INACTIVE) {
            if (!_potential_secondary_states.cleanup(false)) {
                LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed coz "
                                   "learning remote state is still running",
                                   enum_to_string(old_status),
                                   old_ballot,
                                   enum_to_string(config.status),
                                   config.ballot);
                return false;
            }
        }
        break;
    case partition_status::PS_SECONDARY:
        if (config.status != partition_status::PS_SECONDARY &&
            config.status != partition_status::PS_ERROR) {
            if (!_secondary_states.cleanup(false)) {
                // TODO(sunweijie): totally remove this
                dsn::task *native_handle;
                if (_secondary_states.checkpoint_task)
                    native_handle = _secondary_states.checkpoint_task.get();
                else if (_secondary_states.checkpoint_completed_task)
                    native_handle = _secondary_states.checkpoint_completed_task.get();
                else if (_secondary_states.catchup_with_private_log_task)
                    native_handle = _secondary_states.catchup_with_private_log_task.get();
                else
                    native_handle = nullptr;

                LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed coz "
                                   "checkpointing {} is still running",
                                   enum_to_string(old_status),
                                   old_ballot,
                                   enum_to_string(config.status),
                                   config.ballot,
                                   fmt::ptr(native_handle));
                return false;
            }
        }
        break;
    case partition_status::PS_PARTITION_SPLIT:
        if (config.status == partition_status::PS_INACTIVE) {
            LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
                               enum_to_string(old_status),
                               old_ballot,
                               enum_to_string(config.status),
                               config.ballot);
            return false;
        }
        break;
    default:
        break;
    }

    uint64_t old_ts = _last_config_change_time_ms;
    _config = config;
    // we should durable the new ballot to prevent the inconsistent state
    if (_config.ballot > old_ballot) {
        dsn::error_code result = _app->update_init_info_ballot_and_decree(this);
        if (result == dsn::ERR_OK) {
            LOG_INFO_PREFIX(
                "update ballot to init file from {} to {} OK", old_ballot, _config.ballot);
        } else {
            LOG_WARNING_PREFIX(
                "update ballot to init file from {} to {} {}", old_ballot, _config.ballot, result);
        }
        _split_mgr->parent_cleanup_split_context();
    }
    _last_config_change_time_ms = dsn_now_ms();
    CHECK_GE(max_prepared_decree(), last_committed_decree());

    _bulk_loader->clear_bulk_load_states_if_needed(old_status, config.status);

    // Notice: there has five ways that primary can change its partition_status
    //   1, primary change partition config, such as add/remove secondary
    //   2, downgrage to secondary because of load balance
    //   3, disnconnected with meta-server
    //   4, connectied with meta-server
    //   5, crash
    // here, we just need to care about case 1, 2, 3 and 4, ignore case 5
    // the way that partition status change is:
    //   case 1: primary -> ps_inactive & _inactive_is_transient = true -> primary
    //   case 2: primary -> ps_inavtive & _inactive_is_transient = true -> secondary
    //   case 3: primary -> ps_inactive & _inactive_is_transient = ture
    //   case 4: ps_inactive & _inactive_is_transient = true -> primary or secondary
    // the way we process whether primary stop uploading backup checkpoint is that case-1 continue
    // uploading, others just stop uploading
    switch (old_status) {
    case partition_status::PS_PRIMARY:
        cleanup_preparing_mutations(false);
        switch (config.status) {
        case partition_status::PS_PRIMARY:
            replay_prepare_list();
            break;
        case partition_status::PS_INACTIVE:
            _primary_states.cleanup(old_ballot != config.ballot);
            // here we use wheather ballot changes and wheather disconnecting with meta to
            // distinguish different case above mentioned
            if (old_ballot == config.ballot && _stub->is_connected()) {
                // case 1 and case 2, just continue uploading
                //(when case2, we stop uploading when it change to secondary)
            } else {
                set_backup_context_cancel();
                clear_cold_backup_state();
            }
            break;
        case partition_status::PS_SECONDARY:
        case partition_status::PS_ERROR:
            _primary_states.cleanup(true);
            // only load balance will occur primary -> secondary
            // and we just stop upload and release the cold_backup_state, and let new primary to
            // upload
            set_backup_context_cancel();
            clear_cold_backup_state();
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            CHECK(false, "invalid execution path");
            break;
        default:
            CHECK(false, "invalid execution path");
        }
        break;
    case partition_status::PS_SECONDARY:
        cleanup_preparing_mutations(false);
        if (config.status != partition_status::PS_SECONDARY) {
            // if primary change the ballot, secondary will update ballot from A to
            // A+1, we don't need clear cold backup context when this case
            //
            // if secondary upgrade to primary, we must cancel & clear cold_backup_state, because
            // new-primary must check whether backup is already completed by previous-primary

            set_backup_context_cancel();
            clear_cold_backup_state();
        }
        switch (config.status) {
        case partition_status::PS_PRIMARY:
            init_group_check();
            replay_prepare_list();
            break;
        case partition_status::PS_SECONDARY:
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            // prevent further 2pc
            // wait next group check or explicit learn for real learning
            _potential_secondary_states.learning_status = learner_status::LearningWithoutPrepare;
            break;
        case partition_status::PS_INACTIVE:
            break;
        case partition_status::PS_ERROR:
            // _secondary_states.cleanup(true); => do it in close as it may block
            break;
        default:
            CHECK(false, "invalid execution path");
        }
        break;
    case partition_status::PS_POTENTIAL_SECONDARY:
        switch (config.status) {
        case partition_status::PS_PRIMARY:
            CHECK(false, "invalid execution path");
            break;
        case partition_status::PS_SECONDARY:
            _prepare_list->truncate(_app->last_committed_decree());

            // using force cleanup now as all tasks must be done already
            CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
                             "potential secondary context cleanup failed");

            check_state_completeness();
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            break;
        case partition_status::PS_INACTIVE:
            break;
        case partition_status::PS_ERROR:
            _prepare_list->reset(_app->last_committed_decree());
            _potential_secondary_states.cleanup(false);
            // => do this in close as it may block
            // CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
            //                  "potential secondary context cleanup failed");
            break;
        default:
            CHECK(false, "invalid execution path");
        }
        break;
    case partition_status::PS_PARTITION_SPLIT:
        switch (config.status) {
        case partition_status::PS_PRIMARY:
            _split_states.cleanup(true);
            init_group_check();
            replay_prepare_list();
            break;
        case partition_status::PS_SECONDARY:
            _split_states.cleanup(true);
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            CHECK(false, "invalid execution path");
            break;
        case partition_status::PS_INACTIVE:
            break;
        case partition_status::PS_ERROR:
            _split_states.cleanup(false);
            break;
        default:
            CHECK(false, "invalid execution path");
        }
        break;
    case partition_status::PS_INACTIVE:
        if (config.status != partition_status::PS_PRIMARY || !_inactive_is_transient) {
            // except for case 1, we need stop uploading backup checkpoint
            set_backup_context_cancel();
            clear_cold_backup_state();
        }
        switch (config.status) {
        case partition_status::PS_PRIMARY:
            CHECK(_inactive_is_transient, "must be in transient state for being primary next");
            _inactive_is_transient = false;
            init_group_check();
            replay_prepare_list();
            break;
        case partition_status::PS_SECONDARY:
            CHECK(_inactive_is_transient, "must be in transient state for being secondary next");
            _inactive_is_transient = false;
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            _inactive_is_transient = false;
            break;
        case partition_status::PS_INACTIVE:
            break;
        case partition_status::PS_ERROR:
            // => do this in close as it may block
            // if (_inactive_is_transient)
            // {
            //    _secondary_states.cleanup(true);
            // }

            if (_inactive_is_transient) {
                _primary_states.cleanup(true);
                _secondary_states.cleanup(false);
            }
            _inactive_is_transient = false;
            break;
        default:
            CHECK(false, "invalid execution path");
        }
        break;
    case partition_status::PS_ERROR:
        switch (config.status) {
        case partition_status::PS_PRIMARY:
            CHECK(false, "invalid execution path");
            break;
        case partition_status::PS_SECONDARY:
            CHECK(false, "invalid execution path");
            break;
        case partition_status::PS_POTENTIAL_SECONDARY:
            CHECK(false, "invalid execution path");
            break;
        case partition_status::PS_INACTIVE:
            CHECK(false, "invalid execution path");
            break;
        case partition_status::PS_ERROR:
            break;
        default:
            CHECK(false, "invalid execution path");
        }
        break;
    default:
        CHECK(false, "invalid execution path");
    }

    LOG_INFO_PREFIX(
        "status change {} @ {} => {} @ {}, pre({}, {}), app({}, {}), duration = {} ms, {}",
        enum_to_string(old_status),
        old_ballot,
        enum_to_string(status()),
        get_ballot(),
        _prepare_list->max_decree(),
        _prepare_list->last_committed_decree(),
        _app->last_committed_decree(),
        _app->last_durable_decree(),
        _last_config_change_time_ms - old_ts,
        boost::lexical_cast<std::string>(_config));

    if (status() != old_status) {
        bool is_closing =
            (status() == partition_status::PS_ERROR ||
             (status() == partition_status::PS_INACTIVE && get_ballot() > old_ballot));
        _stub->notify_replica_state_update(config, is_closing);

        if (is_closing) {
            LOG_INFO_PREFIX("being close ...");
            _stub->begin_close_replica(this);
            return false;
        }
    } else {
        _stub->notify_replica_state_update(config, false);
    }

    // start pending mutations if necessary
    if (status() == partition_status::PS_PRIMARY) {
        auto next = _primary_states.write_queue.next_work(
            static_cast<int>(max_prepared_decree() - last_committed_decree()));
        if (next != nullptr) {
            init_prepare(next, false);
        }

        CHECK(_primary_states.pc.__isset.hp_secondaries, "");
        if (_primary_states.pc.hp_secondaries.size() + 1 <
            _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
            std::vector<mutation_ptr> queued;
            _primary_states.write_queue.clear(queued);
            for (auto &m : queued) {
                for (auto &r : m->client_requests) {
                    response_client_write(r, ERR_NOT_ENOUGH_MEMBER);
                }
            }
        }
    }

    return true;
}