shed/fbthrift_ext/tcp/lib.rs (50 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under both the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree and the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree.
*/
use anyhow::{Error, Result};
use bytes::{Bytes, BytesMut};
use fbthrift::{Framing, FramingDecoded, FramingEncodedFinal, Transport};
use fbthrift_framed::FramedTransport;
use fbthrift_util::poll_with_lock;
use futures::future::FutureExt;
use std::ffi::CStr;
use std::future::Future;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tower::pipeline::client::Client;
use tokio_util::codec::{Decoder, Framed};
use tower_service::Service;
/// ```ignore
/// let stream = tokio::net::TcpStream::connect(path)?;
/// let transport = TcpTransport::new(stream);
/// let client = <dyn fb303::client::FacebookService>::new(CompactProtocol, transport);
/// ```
pub struct TcpTransport {
service: Arc<Mutex<Client<Framed<TcpStream, FramedTransport>, Error, Bytes>>>,
}
impl TcpTransport {
pub fn new(stream: TcpStream) -> Self {
TcpTransport {
service: Arc::new(Mutex::new(Client::new(FramedTransport.framed(stream)))),
}
}
}
impl Framing for TcpTransport {
type EncBuf = BytesMut;
type DecBuf = Cursor<Bytes>;
fn enc_with_capacity(cap: usize) -> Self::EncBuf {
BytesMut::with_capacity(cap)
}
}
impl Transport for TcpTransport {
type RpcOptions = ();
fn call(
&self,
_service_name: &'static CStr,
_fn_name: &'static CStr,
req: FramingEncodedFinal<Self>,
_rpc_options: Self::RpcOptions,
) -> Pin<Box<dyn Future<Output = Result<FramingDecoded<Self>>> + Send + 'static>> {
let svc = self.service.clone();
(async move {
let locked = poll_with_lock(&svc, |locked, ctx| locked.poll_ready(ctx)).await;
match locked {
Ok(mut locked) => locked.call(req).await,
Err(e) => Err(e),
}
})
.boxed()
}
}