in fastpay/src/network.rs [128:216]
fn handle_message<'a>(
&'a mut self,
buffer: &'a [u8],
) -> futures::future::BoxFuture<'a, Option<Vec<u8>>> {
Box::pin(async move {
let result = deserialize_message(buffer);
let reply = match result {
Err(_) => Err(FastPayError::InvalidDecoding),
Ok(result) => {
match result {
SerializedMessage::Order(message) => self
.server
.state
.handle_transfer_order(*message)
.map(|info| Some(serialize_info_response(&info))),
SerializedMessage::Cert(message) => {
let confirmation_order = ConfirmationOrder {
transfer_certificate: message.as_ref().clone(),
};
match self
.server
.state
.handle_confirmation_order(confirmation_order)
{
Ok((info, send_shard)) => {
// Send a message to other shard
if let Some(cross_shard_update) = send_shard {
let shard = cross_shard_update.shard_id;
let tmp_out = serialize_cross_shard(&message);
debug!(
"Scheduling cross shard query: {} -> {}",
self.server.state.shard_id, shard
);
self.cross_shard_sender
.send((tmp_out, shard))
.await
.expect("internal channel should not fail");
};
// Response
Ok(Some(serialize_info_response(&info)))
}
Err(error) => Err(error),
}
}
SerializedMessage::InfoReq(message) => self
.server
.state
.handle_account_info_request(*message)
.map(|info| Some(serialize_info_response(&info))),
SerializedMessage::CrossShard(message) => {
match self
.server
.state
.handle_cross_shard_recipient_commit(*message)
{
Ok(_) => Ok(None), // Nothing to reply
Err(error) => {
error!("Failed to handle cross-shard query: {}", error);
Ok(None) // Nothing to reply
}
}
}
_ => Err(FastPayError::UnexpectedMessage),
}
}
};
self.server.packets_processed += 1;
if self.server.packets_processed % 5000 == 0 {
info!(
"{}:{} (shard {}) has processed {} packets",
self.server.base_address,
self.server.base_port + self.server.state.shard_id,
self.server.state.shard_id,
self.server.packets_processed
);
}
match reply {
Ok(x) => x,
Err(error) => {
warn!("User query failed: {}", error);
self.server.user_errors += 1;
Some(serialize_error(&error))
}
}
})
}