void replica::on_prepare()

in src/replica/replica_2pc.cpp [438:580]


void replica::on_prepare(dsn::message_ex *request)
{
    _checker.only_one_thread_access();

    replica_configuration rconfig;
    mutation_ptr mu;
    bool pop_all_committed_mutations = false;

    {
        rpc_read_stream reader(request);
        unmarshall(reader, rconfig, DSF_THRIFT_BINARY);
        mu = mutation::read_from(reader, request);
        mu->set_is_sync_to_child(rconfig.split_sync_to_child);
        pop_all_committed_mutations = rconfig.pop_all;
        rconfig.split_sync_to_child = false;
        rconfig.pop_all = false;
    }

    decree decree = mu->data.header.decree;

    LOG_DEBUG_PREFIX("mutation {} on_prepare", mu->name());
    mu->_tracer->set_name(fmt::format("mutation[{}]", mu->name()));
    mu->_tracer->set_description("secondary");
    ADD_POINT(mu->_tracer);

    CHECK_EQ(mu->data.header.pid, rconfig.pid);
    CHECK_EQ(mu->data.header.ballot, rconfig.ballot);

    if (mu->data.header.ballot < get_ballot()) {
        LOG_ERROR_PREFIX("mutation {} on_prepare skipped due to old view", mu->name());
        // no need response because the rpc should have been cancelled on primary in this case
        return;
    }

    // update configuration when necessary
    else if (rconfig.ballot > get_ballot()) {
        if (!update_local_configuration(rconfig)) {
            LOG_ERROR_PREFIX(
                "mutation {} on_prepare failed as update local configuration failed, state = {}",
                mu->name(),
                enum_to_string(status()));
            ack_prepare_message(ERR_INVALID_STATE, mu);
            return;
        }
    }

    if (partition_status::PS_INACTIVE == status() || partition_status::PS_ERROR == status()) {
        LOG_ERROR_PREFIX("mutation {} on_prepare failed as invalid replica state, state = {}",
                         mu->name(),
                         enum_to_string(status()));
        ack_prepare_message((partition_status::PS_INACTIVE == status() && _inactive_is_transient)
                                ? ERR_INACTIVE_STATE
                                : ERR_INVALID_STATE,
                            mu);
        return;
    } else if (partition_status::PS_POTENTIAL_SECONDARY == status()) {
        // new learning process
        if (rconfig.learner_signature != _potential_secondary_states.learning_version) {
            LOG_ERROR_PREFIX("mutation {} on_prepare failed as unmatched learning signature, state "
                             "= {}, old_signature[{:#018x}] vs new_signature[{:#018x}]",
                             mu->name(),
                             enum_to_string(status()),
                             _potential_secondary_states.learning_version,
                             rconfig.learner_signature);
            handle_learning_error(ERR_INVALID_STATE, false);
            ack_prepare_message(ERR_INVALID_STATE, mu);
            return;
        }

        auto learning_status = _potential_secondary_states.learning_status;
        if (learning_status != learner_status::LearningWithPrepare &&
            learning_status != learner_status::LearningSucceeded) {
            // if prepare requests are received when learning status is changing from
            // LearningWithoutPrepare to LearningWithPrepare, we ack ERR_TRY_AGAIN.
            error_code ack_code =
                (learning_status == learner_status::LearningWithoutPrepare ? ERR_TRY_AGAIN
                                                                           : ERR_INVALID_STATE);
            LOG_ERROR_PREFIX(
                "mutation {} on_prepare skipped as invalid learning status, state = {}, "
                "learning_status = {}, ack {}",
                mu->name(),
                enum_to_string(status()),
                enum_to_string(learning_status),
                ack_code);
            ack_prepare_message(ack_code, mu);
            return;
        }
    }

    CHECK_EQ(rconfig.status, status());
    if (decree <= last_committed_decree()) {
        ack_prepare_message(ERR_OK, mu);
        return;
    }

    // real prepare start
    _uniq_timestamp_us.try_update(mu->data.header.timestamp);
    auto mu2 = _prepare_list->get_mutation_by_decree(decree);
    if (mu2 != nullptr && mu2->data.header.ballot == mu->data.header.ballot) {
        if (mu2->is_logged()) {
            // already logged, just response ERR_OK
            ack_prepare_message(ERR_OK, mu);
        } else {
            // not logged, combine duplicate request to old mutation
            mu2->add_prepare_request(request);
        }
        return;
    }

    error_code err = _prepare_list->prepare(mu, status(), pop_all_committed_mutations);
    CHECK_EQ_MSG(err, ERR_OK, "prepare mutation failed");

    if (partition_status::PS_POTENTIAL_SECONDARY == status() ||
        partition_status::PS_SECONDARY == status()) {
        CHECK_LE_MSG(mu->data.header.decree,
                     last_committed_decree() + FLAGS_max_mutation_count_in_prepare_list,
                     "last_committed_decree: {}, FLAGS_max_mutation_count_in_prepare_list: {}",
                     last_committed_decree(),
                     FLAGS_max_mutation_count_in_prepare_list);
    } else {
        LOG_ERROR_PREFIX("mutation {} on_prepare failed as invalid replica state, state = {}",
                         mu->name(),
                         enum_to_string(status()));
        ack_prepare_message(ERR_INVALID_STATE, mu);
        return;
    }

    if (_split_mgr->is_splitting()) {
        _split_mgr->copy_mutation(mu);
    }

    CHECK(mu->log_task() == nullptr, "");
    mu->log_task() = _private_log->append(mu,
                                          LPC_WRITE_REPLICATION_LOG,
                                          &_tracker,
                                          std::bind(&replica::on_append_log_completed,
                                                    this,
                                                    mu,
                                                    std::placeholders::_1,
                                                    std::placeholders::_2),
                                          get_gpid().thread_hash());
    CHECK_NOTNULL(mu->log_task(), "");
}