in worker/src/worker.rs [197:239]
fn handle_workers_messages(&self, tx_primary: Sender<SerializedBatchDigestMessage>) {
let (tx_helper, rx_helper) = channel(CHANNEL_CAPACITY);
let (tx_processor, rx_processor) = channel(CHANNEL_CAPACITY);
// Receive incoming messages from other workers.
let mut address = self
.committee
.worker(&self.name, &self.id)
.expect("Our public key or worker id is not in the committee")
.worker_to_worker;
address.set_ip("0.0.0.0".parse().unwrap());
Receiver::spawn(
address,
/* handler */
WorkerReceiverHandler {
tx_helper,
tx_processor,
},
);
// The `Helper` is dedicated to reply to batch requests from other workers.
Helper::spawn(
self.id,
self.committee.clone(),
self.store.clone(),
/* rx_request */ rx_helper,
);
// This `Processor` hashes and stores the batches we receive from the other workers. It then forwards the
// batch's digest to the `PrimaryConnector` that will send it to our primary.
Processor::spawn(
self.id,
self.store.clone(),
/* rx_batch */ rx_processor,
/* tx_digest */ tx_primary,
/* own_batch */ false,
);
info!(
"Worker {} listening to worker messages on {}",
self.id, address
);
}