plasma-stream/src/client/mod.rs (53 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. use crate::{ errors::{ClientError, PeerResult}, ObjectId, PeerRequest, Request, }; use tokio::{ io::AsyncReadExt, net::{TcpStream, ToSocketAddrs}, }; // CLIENT // ================================================================================================ pub struct Client { socket: TcpStream, } impl Client { /// Connects to the Plasma Stream server at the specified address. pub async fn connect<T: ToSocketAddrs>(address: T) -> Result<Self, std::io::Error> { let socket = TcpStream::connect(address).await?; let client = Client { socket }; Ok(client) } /// Retrieves objects with the specified IDs from the remote plasma store. pub fn copy(&self, _object_ids: &[ObjectId]) { // TODO: implement unimplemented!("not yet implemented"); } /// Retrieves objects with the specified IDs from Plasma Stream server. The retrieved /// objects are deleted from the remote plasma store. pub fn take(&self, _object_ids: &[ObjectId]) { // TODO: implement unimplemented!("not yet implemented"); } /// Instructs the Plasma Stream server to execute the specified requests. pub async fn sync(&mut self, requests: Vec<PeerRequest>) -> Result<(), ClientError> { let num_requests = requests.len(); let request = Request::Sync(requests); request.validate().map_err(ClientError::MalformedRequest)?; // send the request request.write_into(&mut self.socket).await.map_err(|err| { ClientError::ConnectionError(String::from("failed to send a request"), err) })?; // read the response; there should be exactly one byte returned for every // peer request sent let mut response = vec![0u8; num_requests]; self.socket.read_exact(&mut response).await.map_err(|err| { ClientError::ConnectionError(String::from("failed to get a response"), err) })?; // check if the response contains any errors parse_sync_response(&response) } } // HELPER FUNCTIONS // ================================================================================================ fn parse_sync_response(response: &[u8]) -> Result<(), ClientError> { let mut results = Vec::with_capacity(response.len()); let mut err_count = 0; for peer_response in response { let result = PeerResult::from(*peer_response); if !result.is_ok() { err_count += 1; } results.push(result); } if err_count > 0 { Err(ClientError::SyncError(results)) } else { Ok(()) } }