in src/node/node_state.h [412:637]
void initiate_join()
{
auto network_ca = std::make_shared<tls::CA>(config.join.service_cert);
auto join_client_cert = std::make_unique<tls::Cert>(
network_ca,
self_signed_node_cert,
node_sign_kp->private_key_pem(),
config.join.target_rpc_address);
// Create RPC client and connect to remote node
auto join_client =
rpcsessions->create_client(std::move(join_client_cert));
auto [target_host, target_port] =
split_net_address(config.join.target_rpc_address);
join_client->connect(
target_host,
target_port,
[this](
http_status status,
http::HeaderMap&& headers,
std::vector<uint8_t>&& data) {
std::lock_guard<std::mutex> guard(lock);
if (!sm.check(State::pending))
{
return false;
}
if (status != HTTP_STATUS_OK)
{
const auto& location = headers.find(http::headers::LOCATION);
if (
status == HTTP_STATUS_PERMANENT_REDIRECT &&
location != headers.end())
{
const auto& url = http::parse_url_full(location->second);
config.join.target_rpc_address =
make_net_address(url.host, url.port);
LOG_INFO_FMT("Target node redirected to {}", location->second);
}
else
{
LOG_FAIL_FMT(
"An error occurred while joining the network: {} {}{}",
status,
http_status_str(status),
data.empty() ?
"" :
fmt::format(" '{}'", std::string(data.begin(), data.end())));
}
return false;
}
auto j = serdes::unpack(data, serdes::Pack::Text);
JoinNetworkNodeToNode::Out resp;
try
{
resp = j.get<JoinNetworkNodeToNode::Out>();
}
catch (const std::exception& e)
{
LOG_FAIL_FMT(
"An error occurred while parsing the join network response");
LOG_DEBUG_FMT(
"An error occurred while parsing the join network response: {}",
j.dump());
return false;
}
// Set network secrets, node id and become part of network.
if (
resp.node_status == NodeStatus::TRUSTED ||
resp.node_status == NodeStatus::LEARNER)
{
if (resp.network_info->consensus_type != network.consensus_type)
{
throw std::logic_error(fmt::format(
"Enclave initiated with consensus type {} but target node "
"responded with consensus {}",
network.consensus_type,
resp.network_info->consensus_type));
}
network.identity = std::make_unique<ReplicatedNetworkIdentity>(
resp.network_info->identity);
network.ledger_secrets->init_from_map(
std::move(resp.network_info->ledger_secrets));
crypto::Pem n2n_channels_cert;
if (!resp.network_info->endorsed_certificate.has_value())
{
// Endorsed node certificate is included in join response
// from 2.x (CFT only). When joining an existing 1.x service,
// self-sign own certificate and use it to endorse TLS
// connections.
endorsed_node_cert = create_endorsed_node_cert(
default_node_cert_validity_period_days);
history->set_endorsed_certificate(endorsed_node_cert.value());
n2n_channels_cert = endorsed_node_cert.value();
open_frontend(ActorsType::members);
open_user_frontend();
accept_network_tls_connections();
}
else
{
n2n_channels_cert =
resp.network_info->endorsed_certificate.value();
}
setup_consensus(
resp.network_info->service_status.value_or(
ServiceStatus::OPENING),
resp.network_info->reconfiguration_type.value_or(
ReconfigurationType::ONE_TRANSACTION),
resp.network_info->public_only,
n2n_channels_cert);
auto_refresh_jwt_keys();
if (resp.network_info->public_only)
{
last_recovered_signed_idx =
resp.network_info->last_recovered_signed_idx;
setup_recovery_hook();
snapshotter->set_snapshot_generation(false);
}
View view = VIEW_UNKNOWN;
std::vector<kv::Version> view_history = {};
if (startup_snapshot_info)
{
// It is only possible to deserialise the entire snapshot then,
// once the ledger secrets have been passed in by the network
kv::ConsensusHookPtrs hooks;
deserialise_snapshot(
network.tables,
startup_snapshot_info->raw,
hooks,
&view_history,
resp.network_info->public_only,
startup_snapshot_info->evidence_seqno);
for (auto& hook : hooks)
{
hook->call(consensus.get());
}
auto tx = network.tables->create_read_only_tx();
auto signatures = tx.ro(network.signatures);
auto sig = signatures->get();
if (!sig.has_value())
{
throw std::logic_error(
fmt::format("No signatures found after applying snapshot"));
}
view = sig->view;
if (!resp.network_info->public_only)
{
// Only clear snapshot if not recovering. When joining the
// public network the snapshot is used later to initialise the
// recovery store
startup_snapshot_info.reset();
}
LOG_INFO_FMT(
"Joiner successfully resumed from snapshot at seqno {} and "
"view {}",
network.tables->current_version(),
view);
}
consensus->init_as_backup(
network.tables->current_version(), view, view_history);
if (resp.network_info->public_only)
{
sm.advance(State::partOfPublicNetwork);
}
else
{
reset_data(quote_info.quote);
reset_data(quote_info.endorsements);
sm.advance(State::partOfNetwork);
}
LOG_INFO_FMT(
"Node has now joined the network as node {}: {}",
self,
(resp.network_info->public_only ? "public only" : "all domains"));
}
else if (resp.node_status == NodeStatus::PENDING)
{
LOG_INFO_FMT(
"Node {} is waiting for votes of members to be trusted", self);
}
return true;
});
// Send RPC request to remote node to join the network.
JoinNetworkNodeToNode::In join_params;
join_params.node_info_network = config.network;
join_params.public_encryption_key =
node_encrypt_kp->public_key_pem().raw();
join_params.quote_info = quote_info;
join_params.consensus_type = network.consensus_type;
join_params.startup_seqno = startup_seqno;
join_params.certificate_signing_request = node_sign_kp->create_csr(
config.node_certificate.subject_name, subject_alt_names);
LOG_DEBUG_FMT(
"Sending join request to {}", config.join.target_rpc_address);
const auto body = serdes::pack(join_params, serdes::Pack::Text);
http::Request r(fmt::format(
"/{}/{}", ccf::get_actor_prefix(ccf::ActorsType::nodes), "join"));
r.set_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON);
r.set_body(&body);
join_client->send_request(r.build_request());
}