fn handle_message()

in fastpay/src/network.rs [128:216]


    fn handle_message<'a>(
        &'a mut self,
        buffer: &'a [u8],
    ) -> futures::future::BoxFuture<'a, Option<Vec<u8>>> {
        Box::pin(async move {
            let result = deserialize_message(buffer);
            let reply = match result {
                Err(_) => Err(FastPayError::InvalidDecoding),
                Ok(result) => {
                    match result {
                        SerializedMessage::Order(message) => self
                            .server
                            .state
                            .handle_transfer_order(*message)
                            .map(|info| Some(serialize_info_response(&info))),
                        SerializedMessage::Cert(message) => {
                            let confirmation_order = ConfirmationOrder {
                                transfer_certificate: message.as_ref().clone(),
                            };
                            match self
                                .server
                                .state
                                .handle_confirmation_order(confirmation_order)
                            {
                                Ok((info, send_shard)) => {
                                    // Send a message to other shard
                                    if let Some(cross_shard_update) = send_shard {
                                        let shard = cross_shard_update.shard_id;
                                        let tmp_out = serialize_cross_shard(&message);
                                        debug!(
                                            "Scheduling cross shard query: {} -> {}",
                                            self.server.state.shard_id, shard
                                        );
                                        self.cross_shard_sender
                                            .send((tmp_out, shard))
                                            .await
                                            .expect("internal channel should not fail");
                                    };

                                    // Response
                                    Ok(Some(serialize_info_response(&info)))
                                }
                                Err(error) => Err(error),
                            }
                        }
                        SerializedMessage::InfoReq(message) => self
                            .server
                            .state
                            .handle_account_info_request(*message)
                            .map(|info| Some(serialize_info_response(&info))),
                        SerializedMessage::CrossShard(message) => {
                            match self
                                .server
                                .state
                                .handle_cross_shard_recipient_commit(*message)
                            {
                                Ok(_) => Ok(None), // Nothing to reply
                                Err(error) => {
                                    error!("Failed to handle cross-shard query: {}", error);
                                    Ok(None) // Nothing to reply
                                }
                            }
                        }
                        _ => Err(FastPayError::UnexpectedMessage),
                    }
                }
            };

            self.server.packets_processed += 1;
            if self.server.packets_processed % 5000 == 0 {
                info!(
                    "{}:{} (shard {}) has processed {} packets",
                    self.server.base_address,
                    self.server.base_port + self.server.state.shard_id,
                    self.server.state.shard_id,
                    self.server.packets_processed
                );
            }

            match reply {
                Ok(x) => x,
                Err(error) => {
                    warn!("User query failed: {}", error);
                    self.server.user_errors += 1;
                    Some(serialize_error(&error))
                }
            }
        })
    }