in worker/src/worker.rs [138:194]
fn handle_clients_transactions(&self, tx_primary: Sender<SerializedBatchDigestMessage>) {
let (tx_batch_maker, rx_batch_maker) = channel(CHANNEL_CAPACITY);
let (tx_quorum_waiter, rx_quorum_waiter) = channel(CHANNEL_CAPACITY);
let (tx_processor, rx_processor) = channel(CHANNEL_CAPACITY);
// We first receive clients' transactions from the network.
let mut address = self
.committee
.worker(&self.name, &self.id)
.expect("Our public key or worker id is not in the committee")
.transactions;
address.set_ip("0.0.0.0".parse().unwrap());
Receiver::spawn(
address,
/* handler */ TxReceiverHandler { tx_batch_maker },
);
// The transactions are sent to the `BatchMaker` that assembles them into batches. It then broadcasts
// (in a reliable manner) the batches to all other workers that share the same `id` as us. Finally, it
// gathers the 'cancel handlers' of the messages and send them to the `QuorumWaiter`.
BatchMaker::spawn(
self.parameters.batch_size,
self.parameters.max_batch_delay,
/* rx_transaction */ rx_batch_maker,
/* tx_message */ tx_quorum_waiter,
/* workers_addresses */
self.committee
.others_workers(&self.name, &self.id)
.iter()
.map(|(name, addresses)| (*name, addresses.worker_to_worker))
.collect(),
);
// The `QuorumWaiter` waits for 2f authorities to acknowledge reception of the batch. It then forwards
// the batch to the `Processor`.
QuorumWaiter::spawn(
self.committee.clone(),
/* stake */ self.committee.stake(&self.name),
/* rx_message */ rx_quorum_waiter,
/* tx_batch */ tx_processor,
);
// The `Processor` hashes and stores the batch. It then forwards the batch's digest to the `PrimaryConnector`
// that will send it to our primary machine.
Processor::spawn(
self.id,
self.store.clone(),
/* rx_batch */ rx_processor,
/* tx_digest */ tx_primary,
/* own_batch */ true,
);
info!(
"Worker {} listening to client transactions on {}",
self.id, address
);
}