mempool/src/batch_maker.rs (102 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. use crate::mempool::SerializedBatch; #[cfg(feature = "benchmark")] use crypto::Digest; #[cfg(feature = "benchmark")] use ed25519_dalek::{Digest as _, Sha512}; #[cfg(feature = "benchmark")] use log::info; #[cfg(feature = "benchmark")] use std::convert::TryInto as _; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::time::{sleep, Duration, Instant}; #[cfg(test)] #[path = "tests/batch_maker_tests.rs"] pub mod batch_maker_tests; pub type Transaction = Vec<u8>; pub type Batch = Vec<Transaction>; /// Assemble clients transactions into batches. pub struct BatchMaker { /// The preferred batch size (in bytes). batch_size: usize, /// The maximum delay after which to seal the batch (in ms). max_batch_delay: u64, /// Channel to receive transactions from the network. rx_transaction: Receiver<Transaction>, /// Output channel to deliver sealed batches to the `QuorumWaiter`. tx_message: Sender<SerializedBatch>, /// Holds the current batch. current_batch: Batch, /// Holds the size of the current batch (in bytes). current_batch_size: usize, } impl BatchMaker { pub fn spawn( batch_size: usize, max_batch_delay: u64, rx_transaction: Receiver<Transaction>, tx_message: Sender<SerializedBatch>, ) { tokio::spawn(async move { Self { batch_size, max_batch_delay, rx_transaction, tx_message, current_batch: Batch::with_capacity(batch_size * 2), current_batch_size: 0, } .run() .await; }); } /// Main loop receiving incoming transactions and creating batches. async fn run(&mut self) { let timer = sleep(Duration::from_millis(self.max_batch_delay)); tokio::pin!(timer); loop { tokio::select! { // Assemble client transactions into batches of preset size. Some(transaction) = self.rx_transaction.recv() => { self.current_batch_size += transaction.len(); self.current_batch.push(transaction); if self.current_batch_size >= self.batch_size { self.seal().await; timer.as_mut().reset(Instant::now() + Duration::from_millis(self.max_batch_delay)); } }, // If the timer triggers, seal the batch even if it contains few transactions. () = &mut timer => { if !self.current_batch.is_empty() { self.seal().await; } timer.as_mut().reset(Instant::now() + Duration::from_millis(self.max_batch_delay)); } } // Give the change to schedule other tasks. tokio::task::yield_now().await; } } /// Seal and broadcast the current batch. async fn seal(&mut self) { #[cfg(feature = "benchmark")] let size = self.current_batch_size; // Look for sample txs (they all start with 0) and gather their txs id (the next 8 bytes). #[cfg(feature = "benchmark")] let tx_ids: Vec<_> = self .current_batch .iter() .filter(|tx| tx[0] == 0u8 && tx.len() > 8) .filter_map(|tx| tx[1..9].try_into().ok()) .collect(); // Serialize the batch. self.current_batch_size = 0; let batch: Vec<_> = self.current_batch.drain(..).collect(); let serialized = bincode::serialize(&batch).expect("Failed to serialize our own batch"); #[cfg(feature = "benchmark")] { // NOTE: This is one extra hash that is only needed to print the following log entries. let digest = Digest( Sha512::digest(&serialized).as_slice()[..32] .try_into() .unwrap(), ); for id in tx_ids { // NOTE: This log entry is used to compute performance. info!( "Batch {:?} contains sample tx {}", digest, u64::from_be_bytes(id) ); } // NOTE: This log entry is used to compute performance. info!("Batch {:?} contains {} B", digest, size); } // Send the batch through the deliver channel for further processing. self.tx_message .send(serialized) .await .expect("Failed to deliver batch"); } }