void CrowService::run()

in service/http_server/crow_service.cpp [60:390]


void CrowService::run() {
  crow::SimpleApp app;

  // For adding and removing websocket connections
  std::mutex mtx;

  // Get all values
  CROW_ROUTE(app, "/v1/transactions")
  ([this](const crow::request &req, response &res) {
    uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
      std::chrono::system_clock::now().time_since_epoch()
    ).count();

    if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) {
      res.code = 503;
      res.set_header("Content-Type", "text/plain");
      res.end("Get all transactions functionality on cooldown (" + 
        std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS -  cur_time) +
        " ms left)");
    } else {
      last_db_scan_time = cur_time;

      auto values = kv_client_.GetAllValues();
      if (values != nullptr) {
        //LOG(INFO) << "client getallvalues value = " << values->c_str();

        // Send updated blocks list to websocket
        if (users.size() > 0) {
          for (auto u : users)
            u->send_text("Update blocks");
        }

        num_transactions_++;

        res.set_header("Content-Type", "application/json");
        res.end(std::string(values->c_str()));
      } else {
        res.code = 500;
        res.set_header("Content-Type", "text/plain");
        res.end("getallvalues fail");
      }
    }
  });

  // Get value of specific id
  CROW_ROUTE(app, "/v1/transactions/<string>")
  ([this](const crow::request &req, response &res, std::string id) {
    auto value = kv_client_.Get(id);
    if (value != nullptr) {
      LOG(INFO) << "client get value = " << value->c_str();

      // Send updated blocks list to websocket
      if (users.size() > 0) {
        for (auto u : users)
          u->send_text("Update blocks");
      }

      num_transactions_++;

      res.set_header("Content-Type", "application/json");
      res.end(std::string(value->c_str()));
    } else {
      res.code = 500;
      res.set_header("Content-Type", "text/plain");
      res.end("get value fail");
    }
  });

  // Get values based on key range
  CROW_ROUTE(app, "/v1/transactions/<string>/<string>")
  ([this](const crow::request &req, response &res, std::string min_id,
          std::string max_id) {
    auto value = kv_client_.GetRange(min_id, max_id);
    if (value != nullptr) {
      LOG(INFO) << "client getrange value = " << value->c_str();

      // Send updated blocks list to websocket
      if (users.size() > 0) {
        for (auto u : users)
          u->send_text("Update blocks");
      }

      num_transactions_++;

      res.set_header("Content-Type", "application/json");
      res.end(std::string(value->c_str()));
    } else {
      res.code = 500;
      res.set_header("Content-Type", "text/plain");
      res.end("getrange fail");
    }
  });

  // Commit a key-value pair, extracting the id parameter from the JSON
  // object and setting the value as the entire JSON object
  CROW_ROUTE(app, "/v1/transactions/commit")
  .methods("POST"_method)([this](const request& req) {
    std::string body = req.body;
    LOG(INFO) << "body: " << body;

    // Parse transaction JSON
    rapidjson::Document doc;
    doc.Parse(body.c_str());
    if (!doc.IsObject() || !doc.HasMember("id")) {
      response res(400, "Invalid transaction format"); // Bad Request
      res.set_header("Content-Type", "text/plain");
      return res;
    }
    const std::string id = doc["id"].GetString();
    const std::string value = body;

    // Set key-value pair in kv server
    int retval = kv_client_.Set(id, value);

    if (retval != 0) {
      LOG(ERROR) << "Error when trying to commit id " << id;
      response res(500, "id: " + id);
      res.set_header("Content-Type", "text/plain");
      return res;
    }
    LOG(INFO) << "Set " << id << " to " << value;

    // Send updated blocks list to websocket
    if (users.size() > 0) {
      for (auto u : users)
        u->send_text("Update blocks");
    }

    num_transactions_++;

    response res(201, "id: " + id);  // Created status code
    res.set_header("Content-Type", "text/plain");
    return res;
  });

  CROW_ROUTE(app, "/v1/blocks")
  ([this](const crow::request &req, response &res) {
    auto values = GetAllBlocks(100);
    res.set_header("Content-Type", "application/json");
    res.end(values);
  });

  // Retrieve blocks in batches of size of the int parameter
  CROW_ROUTE(app, "/v1/blocks/<int>")
  ([this](const crow::request &req, response &res, int batch_size) {
    auto values = GetAllBlocks(batch_size, false, true);
    if (values == "") {
      res.code = 500;
      res.set_header("Content-Type", "text/plain");
      res.end("get replica state fail");
      exit(1);
    };
    res.set_header("Content-Type", "application/json");
    res.end(values);
  });

  // Retrieve blocks within a range
  CROW_ROUTE(app, "/v1/blocks/<int>/<int>")
  ([this](const crow::request &req, response &res, int min_seq, int max_seq) {
    auto resp = txn_client_.GetTxn(min_seq, max_seq);
    absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
        uint64_t min_seq, uint64_t max_seq);
    if (!resp.ok()) {
      LOG(ERROR) << "get replica state fail";
      res.code = 500;
      res.set_header("Content-Type", "text/plain");
      res.end("get replica state fail");
      exit(1);
    }

    std::string values = "[\n";
    bool first_iteration = true;
    for (auto &txn : *resp) {
      BatchUserRequest request;
      KVRequest kv_request;

      std::string cur_batch_str = "";
      if (request.ParseFromString(txn.second)) {
        LOG(INFO) << request.DebugString();

        if (!first_iteration)
          cur_batch_str.append(",");
        first_iteration = false;

        // id
        uint64_t seq = txn.first;
        cur_batch_str.append("{\"id\": " + std::to_string(seq));

        // number
        cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\"");

        // transactions
        cur_batch_str.append(", \"transactions\": [");
        bool first_transaction = true;
        for (auto &sub_req : request.user_requests()) {
          kv_request.ParseFromString(sub_req.request().data());
          std::string kv_request_json = ParseKVRequest(kv_request);

          if (!first_transaction)
            cur_batch_str.append(",");
          first_transaction = false;
          cur_batch_str.append(kv_request_json);
          cur_batch_str.append("\n");
        }
        cur_batch_str.append("]"); // close transactions list

        // size
        cur_batch_str.append(", \"size\": " +
                             std::to_string(request.ByteSizeLong()));

        // createdAt
        uint64_t createtime = request.createtime();
        cur_batch_str.append(", \"createdAt\": \"" +
                             ParseCreateTime(createtime) + "\"");
      }
      cur_batch_str.append("}\n");
      values.append(cur_batch_str);
    }
    values.append("]\n");
    res.set_header("Content-Type", "application/json");
    res.end(values);
  });

  CROW_ROUTE(app, "/blockupdatelistener")
      .websocket()
      .onopen([&](crow::websocket::connection &conn) {
        LOG(INFO) << "Opened websocket";
        std::lock_guard<std::mutex> _(mtx);
        users.insert(&conn);
      })
      .onclose(
          [&](crow::websocket::connection &conn, const std::string &reason) {
            LOG(INFO) << "Closed websocket";
            std::lock_guard<std::mutex> _(mtx);
            users.erase(&conn);
          })
      .onmessage([&](crow::websocket::connection & /*conn*/,
                     const std::string &data, bool is_binary) {
        // do nothing
      });

  // For metadata table on the Explorer
  CROW_ROUTE(app, "/populatetable")
  ([this](const crow::request& req, response& res) {

    absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState();
	if(!state_or.ok()){
		  LOG(ERROR)<<"get state fail";
		res.set_header("Content-Type", "application/json");
		res.end("");
		return;
	}

    ResConfigData config_data = (*state_or).replica_config();
    ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), CertificateInfo());

    uint32_t replica_num = server_config.GetReplicaNum();
    uint32_t worker_num = server_config.GetWorkerNum();
    uint32_t client_batch_num = server_config.ClientBatchNum();
    uint32_t max_process_txn = server_config.GetMaxProcessTxn();
    uint32_t client_batch_wait_time = server_config.ClientBatchWaitTimeMS();
    uint32_t input_worker_num = server_config.GetInputWorkerNum();
    uint32_t output_worker_num = server_config.GetOutputWorkerNum();
    int client_timeout_ms = server_config.GetClientTimeoutMs();
    int min_data_receive_num = server_config.GetMinDataReceiveNum();
    size_t max_malicious_replica_num = server_config.GetMaxMaliciousReplicaNum();
    int checkpoint_water_mark = server_config.GetCheckPointWaterMark();

    // Don't read the ledger if first commit time is known
    if (first_commit_time_ == 0) {
      // Get first block in the chain
      auto resp = txn_client_.GetTxn(1, 1);
      // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
      //     uint64_t min_seq, uint64_t max_seq);
      if (!resp.ok()) {
        LOG(ERROR) << "get replica state fail";
        res.end("get replica state fail");
      };

      for (auto& txn : *resp) {
        BatchUserRequest request;
        KVRequest kv_request;
        if (request.ParseFromString(txn.second)) {
          // transactions
          for (auto& sub_req : request.user_requests()) {
            kv_request.ParseFromString(sub_req.request().data());
            std::string kv_request_json = ParseKVRequest(kv_request);
          }

          // see resilientdb/common/utils/utils.cpp
          first_commit_time_ = request.createtime() / 1000000;
        }
      }
    }

    uint64_t epoch_time = std::chrono::duration_cast<std::chrono::seconds>(
      std::chrono::system_clock::now().time_since_epoch()
    ).count();
    uint64_t chain_age = first_commit_time_ == 0 ? 
                          0 : epoch_time - first_commit_time_;

    auto block_num_resp = txn_client_.GetBlockNumbers();
    if (!block_num_resp.ok()) {
      LOG(ERROR) << "get number fail";
      exit(1);
    }

    std::string values = "";
    values.append("[{ \"replicaNum\": " + std::to_string(replica_num)
                      + ", \"workerNum\" : " + std::to_string(worker_num) 
                      + ", \"clientBatchNum\" : " + std::to_string(client_batch_num)
                      + ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) 
                      + ", \"clientBatchWaitTime\" : " + std::to_string(client_batch_wait_time)
                      + ", \"inputWorkerNum\" : " + std::to_string(input_worker_num)
                      + ", \"outputWorkerNum\" : " + std::to_string(output_worker_num)
                      + ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms)
                      + ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num)
                      + ", \"maxMaliciousReplicaNum\" : " + std::to_string(max_malicious_replica_num)
                      + ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark)
                      + ", \"transactionNum\" : " + std::to_string(num_transactions_)
                      + ", \"blockNum\" : " + std::to_string(*block_num_resp)
                      + ", \"chainAge\" : " + std::to_string(chain_age)
                      + "}]");
    LOG(INFO) << std::string(values.c_str());
    res.set_header("Content-Type", "application/json");
    res.end(std::string(values.c_str()));
  });

  // Run the Crow app
  app.port(port_num_).multithreaded().run();
}