void replica::init_prepare()

in src/replica/replica_2pc.cpp [237:385]


void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_committed_mutations)
{
    CHECK_EQ(partition_status::PS_PRIMARY, status());

    mu->_tracer->set_description("primary");
    ADD_POINT(mu->_tracer);

    error_code err = ERR_OK;
    uint8_t count = 0;
    const auto request_count = mu->client_requests.size();
    mu->data.header.last_committed_decree = last_committed_decree();

    dsn_log_level_t level = LOG_LEVEL_DEBUG;
    if (mu->data.header.decree == invalid_decree) {
        mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
        // print a debug log if necessary
        if (FLAGS_prepare_decree_gap_for_debug_logging > 0 &&
            mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0)
            level = LOG_LEVEL_INFO;
        mu->set_timestamp(_uniq_timestamp_us.next());
    } else {
        mu->set_id(get_ballot(), mu->data.header.decree);
    }

    mu->_tracer->set_name(fmt::format("mutation[{}]", mu->name()));
    dlog(level,
         "%s: mutation %s init_prepare, mutation_tid=%" PRIu64,
         name(),
         mu->name(),
         mu->tid());

    // child should prepare mutation synchronously
    mu->set_is_sync_to_child(_primary_states.sync_send_write_request);

    // check bounded staleness
    if (mu->data.header.decree > last_committed_decree() + FLAGS_staleness_for_commit) {
        err = ERR_CAPACITY_EXCEEDED;
        goto ErrOut;
    }

    // stop prepare bulk load ingestion if there are secondaries unalive
    for (auto i = 0; i < request_count; ++i) {
        const mutation_update &update = mu->data.updates[i];
        if (update.code != dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
            break;
        }
        LOG_INFO_PREFIX("try to prepare bulk load mutation({})", mu->name());
        if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
            _primary_states.membership.max_replica_count) {
            err = ERR_NOT_ENOUGH_MEMBER;
            break;
        }
    }
    if (err != ERR_OK) {
        goto ErrOut;
    }

    // stop prepare if there are too few replicas unless it's a reconciliation
    // for reconciliation, we should ensure every prepared mutation to be committed
    // please refer to PacificA paper
    if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
            _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) &&
        !reconciliation) {
        err = ERR_NOT_ENOUGH_MEMBER;
        goto ErrOut;
    }

    CHECK_GT(mu->data.header.decree, last_committed_decree());

    // local prepare
    err = _prepare_list->prepare(mu, partition_status::PS_PRIMARY, pop_all_committed_mutations);
    if (err != ERR_OK) {
        goto ErrOut;
    }

    // remote prepare
    mu->set_prepare_ts();
    mu->set_left_secondary_ack_count((unsigned int)_primary_states.membership.secondaries.size());
    for (auto it = _primary_states.membership.secondaries.begin();
         it != _primary_states.membership.secondaries.end();
         ++it) {
        send_prepare_message(*it,
                             partition_status::PS_SECONDARY,
                             mu,
                             FLAGS_prepare_timeout_ms_for_secondaries,
                             pop_all_committed_mutations);
    }

    count = 0;
    for (auto it = _primary_states.learners.begin(); it != _primary_states.learners.end(); ++it) {
        if (it->second.prepare_start_decree != invalid_decree &&
            mu->data.header.decree >= it->second.prepare_start_decree) {
            send_prepare_message(it->first,
                                 partition_status::PS_POTENTIAL_SECONDARY,
                                 mu,
                                 FLAGS_prepare_timeout_ms_for_potential_secondaries,
                                 pop_all_committed_mutations,
                                 it->second.signature);
            count++;
        }
    }
    mu->set_left_potential_secondary_ack_count(count);

    if (_split_mgr->is_splitting()) {
        _split_mgr->copy_mutation(mu);
    }

    if (mu->is_logged()) {
        do_possible_commit_on_primary(mu);
    } else {
        CHECK_EQ(mu->data.header.log_offset, invalid_offset);
        CHECK(mu->log_task() == nullptr, "");
        int64_t pending_size;
        mu->log_task() = _private_log->append(mu,
                                              LPC_WRITE_REPLICATION_LOG,
                                              &_tracker,
                                              std::bind(&replica::on_append_log_completed,
                                                        this,
                                                        mu,
                                                        std::placeholders::_1,
                                                        std::placeholders::_2),
                                              get_gpid().thread_hash(),
                                              &pending_size);
        CHECK_NOTNULL(mu->log_task(), "");
        if (FLAGS_log_shared_pending_size_throttling_threshold_kb > 0 &&
            FLAGS_log_shared_pending_size_throttling_delay_ms > 0 &&
            pending_size >= FLAGS_log_shared_pending_size_throttling_threshold_kb * 1024) {
            int delay_ms = FLAGS_log_shared_pending_size_throttling_delay_ms;
            for (dsn::message_ex *r : mu->client_requests) {
                if (r && r->io_session->delay_recv(delay_ms)) {
                    LOG_WARNING("too large pending shared log ({}), delay traffic from {} for {} "
                                "milliseconds",
                                pending_size,
                                r->header->from_address,
                                delay_ms);
                }
            }
        }
    }

    _primary_states.last_prepare_ts_ms = mu->prepare_ts_ms();
    return;

ErrOut:
    for (auto &r : mu->client_requests) {
        response_client_write(r, err);
    }
    return;
}