in src/enclave/enclave.h [247:487]
bool run_main()
{
LOG_DEBUG_FMT("Running main thread");
#ifndef VIRTUAL_ENCLAVE
try
#endif
{
messaging::BufferProcessor bp("Enclave");
// reconstruct oversized messages sent to the enclave
oversized::FragmentReconstructor fr(bp.get_dispatcher());
DISPATCHER_SET_MESSAGE_HANDLER(
bp, AdminMessage::stop, [&bp](const uint8_t*, size_t) {
bp.set_finished();
threading::ThreadMessaging::thread_messaging.set_finished();
});
last_tick_time = enclave::get_enclave_time();
DISPATCHER_SET_MESSAGE_HANDLER(
bp, AdminMessage::tick, [this, &bp](const uint8_t*, size_t) {
const auto message_counts =
bp.get_dispatcher().retrieve_message_counts();
const auto j =
bp.get_dispatcher().convert_message_counts(message_counts);
RINGBUFFER_WRITE_MESSAGE(
AdminMessage::work_stats, to_host, j.dump());
const auto time_now = enclave::get_enclave_time();
ringbuffer_logger->set_time(time_now);
const auto elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
time_now - last_tick_time);
if (elapsed_ms.count() > 0)
{
last_tick_time = time_now;
node->tick(elapsed_ms);
context->historical_state_cache->tick(elapsed_ms);
threading::ThreadMessaging::thread_messaging.tick(elapsed_ms);
// When recovering, no signature should be emitted while the
// public ledger is being read
if (!node->is_reading_public_ledger())
{
for (auto& [actor, frontend] : rpc_map->frontends())
{
frontend->tick(elapsed_ms);
}
}
node->tick_end();
}
});
DISPATCHER_SET_MESSAGE_HANDLER(
bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
node->recv_node_inbound(data, size);
});
DISPATCHER_SET_MESSAGE_HANDLER(
bp,
consensus::ledger_entry_range,
[this](const uint8_t* data, size_t size) {
const auto [from_seqno, to_seqno, purpose, body] =
ringbuffer::read_message<consensus::ledger_entry_range>(
data, size);
switch (purpose)
{
case consensus::LedgerRequestPurpose::Recovery:
{
if (from_seqno != to_seqno)
{
LOG_FAIL_FMT(
"Unexpected range for Recovery response "
"ledger_entry_range: {}->{} "
"(expected single ledger entry)",
from_seqno,
to_seqno);
}
if (
node->is_reading_public_ledger() ||
node->is_verifying_snapshot())
{
node->recover_public_ledger_entry(body);
}
else if (node->is_reading_private_ledger())
{
node->recover_private_ledger_entry(body);
}
else
{
auto [s, _, __] = node->state();
LOG_FAIL_FMT(
"Cannot recover ledger entry: Unexpected node state {}", s);
}
break;
}
case consensus::LedgerRequestPurpose::HistoricalQuery:
{
context->historical_state_cache->handle_ledger_entries(
from_seqno, to_seqno, body);
break;
}
default:
{
LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
}
}
});
DISPATCHER_SET_MESSAGE_HANDLER(
bp,
consensus::ledger_no_entry_range,
[this](const uint8_t* data, size_t size) {
const auto [from_seqno, to_seqno, purpose] =
ringbuffer::read_message<consensus::ledger_no_entry_range>(
data, size);
switch (purpose)
{
case consensus::LedgerRequestPurpose::Recovery:
{
if (from_seqno != to_seqno)
{
LOG_FAIL_FMT(
"Unexpected range for Recovery response "
"ledger_no_entry_range: {}->{} "
"(expected single ledger entry)",
from_seqno,
to_seqno);
}
if (node->is_verifying_snapshot())
{
node->verify_snapshot_end();
}
else
{
node->recover_ledger_end();
}
break;
}
case consensus::LedgerRequestPurpose::HistoricalQuery:
{
context->historical_state_cache->handle_no_entry_range(
from_seqno, to_seqno);
break;
}
default:
{
LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
}
}
});
rpcsessions->register_message_handlers(bp.get_dispatcher());
if (start_type == StartType::Join)
{
// When joining from a snapshot, deserialise ledger suffix to verify
// snapshot evidence. Otherwise, attempt to join straight away
if (node->is_verifying_snapshot())
{
node->start_ledger_recovery();
}
else
{
node->join();
}
}
else if (start_type == StartType::Recover)
{
node->start_ledger_recovery();
}
// Maximum number of inbound ringbuffer messages which will be
// processed in a single iteration
static constexpr size_t max_messages = 256;
size_t consecutive_idles = 0u;
while (!bp.get_finished())
{
// First, read some messages from the ringbuffer
auto read = bp.read_n(max_messages, circuit->read_from_outside());
// Then, execute some thread messages
size_t thread_msg = 0;
while (thread_msg < max_messages &&
threading::ThreadMessaging::thread_messaging.run_one())
{
thread_msg++;
}
// If no messages were read from the ringbuffer and no thread
// messages were executed, idle
if (read == 0 && thread_msg == 0)
{
const auto time_now = enclave::get_enclave_time();
static std::chrono::microseconds idling_start_time;
if (consecutive_idles == 0)
{
idling_start_time = time_now;
}
// Handle initial idles by pausing, eventually sleep (in host)
constexpr std::chrono::milliseconds timeout(5);
if ((time_now - idling_start_time) > timeout)
{
std::this_thread::sleep_for(timeout * 10);
}
else
{
CCF_PAUSE();
}
consecutive_idles++;
}
else
{
// If some messages were read, reset consecutive idles count
consecutive_idles = 0;
}
}
LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, to_host);
return true;
}
#ifndef VIRTUAL_ENCLAVE
catch (const std::exception& e)
{
// It is expected that all enclave modules consuming ring buffer
// messages catch any thrown exception they can recover from. Uncaught
// exceptions bubble up to here and cause the node to shutdown.
RINGBUFFER_WRITE_MESSAGE(
AdminMessage::fatal_error_msg, to_host, std::string(e.what()));
return false;
}
#endif
}