bool run_main()

in src/enclave/enclave.h [247:487]


    bool run_main()
    {
      LOG_DEBUG_FMT("Running main thread");
#ifndef VIRTUAL_ENCLAVE
      try
#endif
      {
        messaging::BufferProcessor bp("Enclave");

        // reconstruct oversized messages sent to the enclave
        oversized::FragmentReconstructor fr(bp.get_dispatcher());

        DISPATCHER_SET_MESSAGE_HANDLER(
          bp, AdminMessage::stop, [&bp](const uint8_t*, size_t) {
            bp.set_finished();
            threading::ThreadMessaging::thread_messaging.set_finished();
          });

        last_tick_time = enclave::get_enclave_time();

        DISPATCHER_SET_MESSAGE_HANDLER(
          bp, AdminMessage::tick, [this, &bp](const uint8_t*, size_t) {
            const auto message_counts =
              bp.get_dispatcher().retrieve_message_counts();
            const auto j =
              bp.get_dispatcher().convert_message_counts(message_counts);
            RINGBUFFER_WRITE_MESSAGE(
              AdminMessage::work_stats, to_host, j.dump());

            const auto time_now = enclave::get_enclave_time();
            ringbuffer_logger->set_time(time_now);

            const auto elapsed_ms =
              std::chrono::duration_cast<std::chrono::milliseconds>(
                time_now - last_tick_time);
            if (elapsed_ms.count() > 0)
            {
              last_tick_time = time_now;

              node->tick(elapsed_ms);
              context->historical_state_cache->tick(elapsed_ms);
              threading::ThreadMessaging::thread_messaging.tick(elapsed_ms);
              // When recovering, no signature should be emitted while the
              // public ledger is being read
              if (!node->is_reading_public_ledger())
              {
                for (auto& [actor, frontend] : rpc_map->frontends())
                {
                  frontend->tick(elapsed_ms);
                }
              }
              node->tick_end();
            }
          });

        DISPATCHER_SET_MESSAGE_HANDLER(
          bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
            node->recv_node_inbound(data, size);
          });

        DISPATCHER_SET_MESSAGE_HANDLER(
          bp,
          consensus::ledger_entry_range,
          [this](const uint8_t* data, size_t size) {
            const auto [from_seqno, to_seqno, purpose, body] =
              ringbuffer::read_message<consensus::ledger_entry_range>(
                data, size);
            switch (purpose)
            {
              case consensus::LedgerRequestPurpose::Recovery:
              {
                if (from_seqno != to_seqno)
                {
                  LOG_FAIL_FMT(
                    "Unexpected range for Recovery response "
                    "ledger_entry_range: {}->{} "
                    "(expected single ledger entry)",
                    from_seqno,
                    to_seqno);
                }
                if (
                  node->is_reading_public_ledger() ||
                  node->is_verifying_snapshot())
                {
                  node->recover_public_ledger_entry(body);
                }
                else if (node->is_reading_private_ledger())
                {
                  node->recover_private_ledger_entry(body);
                }
                else
                {
                  auto [s, _, __] = node->state();
                  LOG_FAIL_FMT(
                    "Cannot recover ledger entry: Unexpected node state {}", s);
                }
                break;
              }
              case consensus::LedgerRequestPurpose::HistoricalQuery:
              {
                context->historical_state_cache->handle_ledger_entries(
                  from_seqno, to_seqno, body);
                break;
              }
              default:
              {
                LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
              }
            }
          });

        DISPATCHER_SET_MESSAGE_HANDLER(
          bp,
          consensus::ledger_no_entry_range,
          [this](const uint8_t* data, size_t size) {
            const auto [from_seqno, to_seqno, purpose] =
              ringbuffer::read_message<consensus::ledger_no_entry_range>(
                data, size);
            switch (purpose)
            {
              case consensus::LedgerRequestPurpose::Recovery:
              {
                if (from_seqno != to_seqno)
                {
                  LOG_FAIL_FMT(
                    "Unexpected range for Recovery response "
                    "ledger_no_entry_range: {}->{} "
                    "(expected single ledger entry)",
                    from_seqno,
                    to_seqno);
                }
                if (node->is_verifying_snapshot())
                {
                  node->verify_snapshot_end();
                }
                else
                {
                  node->recover_ledger_end();
                }
                break;
              }
              case consensus::LedgerRequestPurpose::HistoricalQuery:
              {
                context->historical_state_cache->handle_no_entry_range(
                  from_seqno, to_seqno);
                break;
              }
              default:
              {
                LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
              }
            }
          });

        rpcsessions->register_message_handlers(bp.get_dispatcher());

        if (start_type == StartType::Join)
        {
          // When joining from a snapshot, deserialise ledger suffix to verify
          // snapshot evidence. Otherwise, attempt to join straight away
          if (node->is_verifying_snapshot())
          {
            node->start_ledger_recovery();
          }
          else
          {
            node->join();
          }
        }
        else if (start_type == StartType::Recover)
        {
          node->start_ledger_recovery();
        }

        // Maximum number of inbound ringbuffer messages which will be
        // processed in a single iteration
        static constexpr size_t max_messages = 256;

        size_t consecutive_idles = 0u;
        while (!bp.get_finished())
        {
          // First, read some messages from the ringbuffer
          auto read = bp.read_n(max_messages, circuit->read_from_outside());

          // Then, execute some thread messages
          size_t thread_msg = 0;
          while (thread_msg < max_messages &&
                 threading::ThreadMessaging::thread_messaging.run_one())
          {
            thread_msg++;
          }

          // If no messages were read from the ringbuffer and no thread
          // messages were executed, idle
          if (read == 0 && thread_msg == 0)
          {
            const auto time_now = enclave::get_enclave_time();
            static std::chrono::microseconds idling_start_time;

            if (consecutive_idles == 0)
            {
              idling_start_time = time_now;
            }

            // Handle initial idles by pausing, eventually sleep (in host)
            constexpr std::chrono::milliseconds timeout(5);
            if ((time_now - idling_start_time) > timeout)
            {
              std::this_thread::sleep_for(timeout * 10);
            }
            else
            {
              CCF_PAUSE();
            }

            consecutive_idles++;
          }
          else
          {
            // If some messages were read, reset consecutive idles count
            consecutive_idles = 0;
          }
        }

        LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
        RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, to_host);

        return true;
      }
#ifndef VIRTUAL_ENCLAVE
      catch (const std::exception& e)
      {
        // It is expected that all enclave modules consuming ring buffer
        // messages catch any thrown exception they can recover from. Uncaught
        // exceptions bubble up to here and cause the node to shutdown.
        RINGBUFFER_WRITE_MESSAGE(
          AdminMessage::fatal_error_msg, to_host, std::string(e.what()));
        return false;
      }
#endif
    }