thrift/compiler/generate/templates/rust/lib/server.mustache (479 lines of code) (raw):

{{! Copyright (c) Facebook, Inc. and its affiliates. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. }} {{#service:interactions}}{{>lib/server}} {{/service:interactions}} {{#service:docs?}} #[doc = {{service:docs}}] {{/service:docs?}} #[::async_trait::async_trait] pub trait {{service:name}}: ::std::marker::Send + ::std::marker::Sync + 'static {{>lib/block}}{{! }}{{#service:requestContext?}} type RequestContext: ::std::marker::Sync;{{! }}{{/service:requestContext?}} {{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:returns_streams?}} {{#function:docs?}} #[doc = {{function:docs}}] {{/function:docs?}} async fn {{function:rust_name}}( &self,{{! }}{{#service:requestContext?}} _request_context: &Self::RequestContext,{{! }}{{/service:requestContext?}}{{! }}{{#function:args}} _{{field:name}}: {{#field:type}}{{>lib/type}}{{/field:type}},{{! }}{{/function:args}} ) -> ::std::result::Result<{{! }}{{#function:return_type}}{{>lib/type}}{{/function:return_type}}, {{! }}{{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn{{! }}> { ::std::result::Result::Err({{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn::ApplicationException( ::fbthrift::ApplicationException::unimplemented_method( "{{service:name}}", "{{function:name}}", ), )) } {{/function:returns_streams?}}{{#function:returns_streams?}} // {{function:rust_name}}: server-side streaming not yet implemented {{/function:returns_streams?}} {{/function:starts_interaction?}}{{#function:starts_interaction?}} {{#function:docs?}} #[doc = {{function:docs}}] {{/function:docs?}} fn {{function:rust_name}}( &self, ) -> ::anyhow::Result<::std::boxed::Box<dyn {{function:interaction_name}}>> { ::anyhow::bail!("{{service:name}}.{{function:name}} not implemented"); } {{/function:starts_interaction?}}{{/service:rustFunctions}} } #[::async_trait::async_trait] impl<T> {{service:name}} for ::std::boxed::Box<T> where T: {{service:name}} + Send + Sync + ?Sized, {{>lib/block}} {{#service:requestContext?}} type RequestContext = T::RequestContext;{{! }}{{/service:requestContext?}} {{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:returns_streams?}} async fn {{function:rust_name}}( &self,{{! }}{{#service:requestContext?}} request_context: &Self::RequestContext,{{! }}{{/service:requestContext?}}{{! }}{{#function:args}} {{field:rust_name}}: {{#field:type}}{{>lib/type}}{{/field:type}},{{! }}{{/function:args}} ) -> ::std::result::Result<{{! }}{{#function:return_type}}{{>lib/type}}{{/function:return_type}}, {{! }}{{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn{{! }}> { (**self).{{function:rust_name}}({{! }}{{#service:requestContext?}} request_context, {{! }}{{/service:requestContext?}}{{! }}{{#function:args}} {{field:rust_name}}, {{! }}{{/function:args}} ).await } {{/function:returns_streams?}}{{#function:returns_streams?}} // {{function:rust_name}}: server-side streaming not yet implemented {{/function:returns_streams?}} {{/function:starts_interaction?}}{{#function:starts_interaction?}} fn {{function:rust_name}}( &self, ) -> ::anyhow::Result<::std::boxed::Box<dyn {{function:interaction_name}}>> { (**self).{{function:rust_name}}() } {{/function:starts_interaction?}}{{/service:rustFunctions}} } /// Processor for {{service:name}}'s methods. #[derive(Clone, Debug)] pub struct {{service:name}}Processor<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}{{! }}> { service: H,{{! }}{{#service:extends?}} supa: SS,{{! }}{{/service:extends?}}{{! }}{{^service:extends?}} supa: ::fbthrift::NullServiceProcessor<P, R>,{{! }}{{/service:extends?}} _phantom: ::std::marker::PhantomData<(P, H, R)>, } {{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:returns_streams?}} struct Args_{{service:name}}_{{function:name}} { {{#function:args}} {{field:rust_name}}: {{>lib/fieldtype}}, {{/function:args}} } impl<P: ::fbthrift::ProtocolReader> ::fbthrift::Deserialize<P> for self::Args_{{service:name}}_{{function:name}} { #[inline]{{! No cost because there's only one caller; with luck will mitigate move cost of args. }} #[::tracing::instrument(skip_all, level = "trace", name = "deserialize_args", fields(method = "{{service:name}}.{{function:name}}"))] fn read(p: &mut P) -> ::anyhow::Result<Self> { static ARGS: &[::fbthrift::Field] = &[ {{#function:args_by_name}} ::fbthrift::Field::new("{{field:name}}", {{#field:type}}{{>lib/ttype}}{{/field:type}}, {{field:key}}), {{/function:args_by_name}} ]; {{#function:args}} let mut field_{{field:name}} = ::std::option::Option::None; {{/function:args}} let _ = p.read_struct_begin(|_| ())?; loop { let (_, fty, fid) = p.read_field_begin(|_| (), ARGS)?; match (fty, fid as ::std::primitive::i32) { (::fbthrift::TType::Stop, _) => break,{{! }}{{#function:args}} ({{#field:type}}{{>lib/ttype}}{{/field:type}}, {{field:key}}) => {{! }}field_{{field:name}} = ::std::option::Option::Some({{#field:type}}{{>lib/read}}{{/field:type}}(p)?),{{! }}{{/function:args}} (fty, _) => p.skip(fty)?, } p.read_field_end()?; } p.read_struct_end()?; {{! Use formatting in errors to try to maximize string sharing }} ::std::result::Result::Ok(Self {{>lib/block}}{{! }}{{#function:args}} {{field:rust_name}}: field_{{field:name}}.ok_or_else(|| ::anyhow::anyhow!("`{}` missing arg `{}`", "{{service:name}}.{{function:name}}", "{{field:name}}"))?,{{! }}{{/function:args}} }) } } {{/function:returns_streams?}}{{/function:starts_interaction?}}{{/service:rustFunctions}} impl<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}> {{service:name}}Processor<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}> where P: ::fbthrift::Protocol + ::std::marker::Send + ::std::marker::Sync + 'static, P::Deserializer: ::std::marker::Send, H: {{service:name}}{{! }}{{#service:requestContext?}}<RequestContext = R>{{/service:requestContext?}}, R: ::fbthrift::RequestContext<Name = ::std::ffi::CStr> + ::std::marker::Sync, <R as ::fbthrift::RequestContext>::ContextStack: ::fbthrift::ContextStack<Name = R::Name, Buffer = ::fbthrift::ProtocolDecoded<P>> + ::std::marker::Send + ::std::marker::Sync,{{! }}{{#service:extends?}} SS: ::fbthrift::ThriftService<P::Frame>, SS::Handler: {{>lib/super}}, P::Frame: ::std::marker::Send + 'static,{{! }}{{/service:extends?}} { pub fn new({{! }}service: H{{! }}{{#service:extends?}}, supa: SS{{/service:extends?}}{{! }}) -> Self { Self { service, supa{{^service:extends?}}{{! }}: ::fbthrift::NullServiceProcessor::new(){{! }}{{/service:extends?}}, _phantom: ::std::marker::PhantomData, } } pub fn into_inner(self) -> H { self.service }{{! }}{{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:returns_streams?}} #[::tracing::instrument(skip_all, fields(method = "{{service:name}}.{{function:name}}"))] async fn handle_{{function:name}}<'a>( &'a self, p: &'a mut P::Deserializer, _req_ctxt: &R, ctx_stack: &mut R::ContextStack, ) -> ::anyhow::Result<{{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn> { use ::const_cstr::const_cstr; use ::tracing::Instrument as _; use ::futures::FutureExt as _; const_cstr! { SERVICE_NAME = "{{service:name}}"; METHOD_NAME = "{{service:name}}.{{function:name}}"; } ::fbthrift::ContextStack::pre_read(ctx_stack)?; let _args: self::Args_{{service:name}}_{{function:name}} = ::fbthrift::Deserialize::read(p)?; ::fbthrift::ContextStack::on_read_data(ctx_stack, &::fbthrift::SerializedMessage { protocol: P::PROTOCOL_ID, method_name: METHOD_NAME.as_cstr(), buffer: ::std::marker::PhantomData, // FIXME P::into_buffer(p).reset(), })?; ::fbthrift::ContextStack::post_read(ctx_stack, 0)?; let res = ::std::panic::AssertUnwindSafe( self.service.{{function:rust_name}}({{! }}{{#service:requestContext?}} _req_ctxt,{{! }}{{/service:requestContext?}}{{! }}{{#function:args}} _args.{{field:rust_name}},{{! }}{{/function:args}} ) ) .catch_unwind() .instrument(::tracing::info_span!("service_handler", method = "{{service:name}}.{{function:name}}")) .await; // nested results - panic catch on the outside, method on the inside let res = match res { ::std::result::Result::Ok(::std::result::Result::Ok(res)) => { ::tracing::info!(method = "{{service:name}}.{{function:name}}", "success"); {{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn::Success(res) } ::std::result::Result::Ok(::std::result::Result::Err({{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn::Success(_))) => { panic!( "{} attempted to return success via error", "{{function:rust_name}}", ) } ::std::result::Result::Ok(::std::result::Result::Err(exn)) => { ::tracing::error!(method = "{{service:name}}.{{function:name}}", exception = ?exn); exn } ::std::result::Result::Err(exn) => { let aexn = ::fbthrift::ApplicationException::handler_panic("{{service:name}}.{{function:name}}", exn); {{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn::ApplicationException(aexn) } }; ::std::result::Result::Ok(res) }{{! }}{{/function:returns_streams?}}{{/function:starts_interaction?}}{{#function:starts_interaction?}} fn handle_{{function:name}}( &self, ) -> ::anyhow::Result<::std::boxed::Box<dyn {{function:interaction_name}}>> { self.service.{{function:rust_name}}() }{{! }}{{/function:starts_interaction?}}{{! }}{{/service:rustFunctions}} } #[::async_trait::async_trait] impl<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}> ::fbthrift::ServiceProcessor<P> for {{service:name}}Processor<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}> where P: ::fbthrift::Protocol + ::std::marker::Send + ::std::marker::Sync + 'static, P::Deserializer: ::std::marker::Send, H: {{service:name}}{{! }}{{#service:requestContext?}}<RequestContext = R>{{/service:requestContext?}},{{! }}{{#service:extends?}} SS: ::fbthrift::ThriftService<P::Frame>, SS::Handler: {{>lib/super}},{{! }}{{/service:extends?}} P::Frame: ::std::marker::Send + 'static, R: ::fbthrift::RequestContext<Name = ::std::ffi::CStr> + ::std::marker::Send + ::std::marker::Sync + 'static, <R as ::fbthrift::RequestContext>::ContextStack: ::fbthrift::ContextStack<Name = R::Name, Buffer = ::fbthrift::ProtocolDecoded<P>> + ::std::marker::Send + ::std::marker::Sync + 'static { type RequestContext = R; #[inline] fn method_idx(&self, name: &[::std::primitive::u8]) -> ::std::result::Result<::std::primitive::usize, ::fbthrift::ApplicationException> { match name {{>lib/block}}{{! }}{{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:returns_streams?}} b"{{#service:interaction?}}{{service:name}}.{{/service:interaction?}}{{function:name}}" => ::std::result::Result::Ok({{function:index}}usize),{{! }}{{/function:returns_streams?}}{{/function:starts_interaction?}}{{/service:rustFunctions}} _ => ::std::result::Result::Err(::fbthrift::ApplicationException::unknown_method()), } } async fn handle_method( &self, idx: ::std::primitive::usize, _p: &mut P::Deserializer, _r: &R, _seqid: ::std::primitive::u32, ) -> ::anyhow::Result<::fbthrift::ProtocolEncodedFinal<P>> { match idx {{>lib/block}}{{! }}{{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:returns_streams?}} {{function:index}}usize => { use const_cstr::const_cstr; const_cstr! { SERVICE_NAME = "{{service:name}}"; METHOD_NAME = "{{service:name}}.{{function:name}}"; } let mut ctx_stack = _r.get_context_stack( SERVICE_NAME.as_cstr(), METHOD_NAME.as_cstr(), )?; let res = self.handle_{{function:name}}(_p, _r, &mut ctx_stack).await?; let env = ::fbthrift::help::serialize_result_envelope::<P, R, _>( "{{function:name}}", METHOD_NAME.as_cstr(), _seqid, _r, &mut ctx_stack, res )?; Ok(env) }{{! }}{{/function:returns_streams?}}{{/function:starts_interaction?}}{{/service:rustFunctions}} bad => panic!( "{}: unexpected method idx {}", "{{service:name}}Processor", bad ), } } #[inline] fn create_interaction_idx(&self, name: &str) -> ::anyhow::Result<::std::primitive::usize> { match name {{>lib/block}}{{! }}{{#service:rustFunctions}}{{#function:starts_interaction?}} "{{function:interaction_name}}" => ::std::result::Result::Ok({{function:index}}usize),{{! }}{{/function:starts_interaction?}}{{/service:rustFunctions}} _ => ::anyhow::bail!("Unknown interaction"), } } fn handle_create_interaction( &self, idx: ::std::primitive::usize, ) -> ::anyhow::Result< ::std::sync::Arc<dyn ::fbthrift::ThriftService<P::Frame, Handler = (), RequestContext = Self::RequestContext> + ::std::marker::Send + 'static> > { match idx {{>lib/block}}{{! }}{{#service:rustFunctions}}{{#function:starts_interaction?}} {{function:index}}usize => { let handler = self.handle_{{function:name}}()?; let server = ::std::sync::Arc::new({{function:interaction_name}}Processor::<P, ::std::boxed::Box<dyn {{function:interaction_name}}>, R>::new(handler)); Ok(server) }{{! }}{{/function:starts_interaction?}}{{/service:rustFunctions}} bad => panic!( "{}: unexpected method idx {}", "{{service:name}}Processor", bad ), } } } #[::async_trait::async_trait] impl<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}> ::fbthrift::ThriftService<P::Frame> for {{service:name}}Processor<P, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}> where P: ::fbthrift::Protocol + ::std::marker::Send + ::std::marker::Sync + 'static, P::Deserializer: ::std::marker::Send, P::Frame: ::std::marker::Send + 'static, H: {{service:name}}{{! }}{{#service:requestContext?}}<RequestContext = R>{{/service:requestContext?}},{{! }}{{#service:extends?}} SS: ::fbthrift::ThriftService<P::Frame, RequestContext = R>, SS::Handler: {{>lib/super}}, P::Frame: ::std::marker::Send + 'static,{{! }}{{/service:extends?}} R: ::fbthrift::RequestContext<Name = ::std::ffi::CStr> + ::std::marker::Send + ::std::marker::Sync + 'static, <R as ::fbthrift::RequestContext>::ContextStack: ::fbthrift::ContextStack<Name = R::Name, Buffer = ::fbthrift::ProtocolDecoded<P>> + ::std::marker::Send + ::std::marker::Sync + 'static { {{#service:interaction?}} // Interactions have () as their handler associated type // to make `create_interaction` have a common return type. type Handler = (); {{/service:interaction?}}{{^service:interaction?}} type Handler = H; {{/service:interaction?}} type RequestContext = R; #[tracing::instrument(level="trace", skip_all, fields(service = "{{service:name}}"))] async fn call( &self, req: ::fbthrift::ProtocolDecoded<P>, req_ctxt: &R, ) -> ::anyhow::Result<::fbthrift::ProtocolEncodedFinal<P>> { use ::fbthrift::{BufExt as _, ProtocolReader as _, ServiceProcessor as _}; let mut p = P::deserializer(req); let (idx, mty, seqid) = p.read_message_begin(|name| self.method_idx(name))?; if mty != ::fbthrift::MessageType::Call { return ::std::result::Result::Err(::std::convert::From::from(::fbthrift::ApplicationException::new( ::fbthrift::ApplicationExceptionErrorCode::InvalidMessageType, format!("message type {:?} not handled", mty) ))); } let idx = match idx { ::std::result::Result::Ok(idx) => idx, ::std::result::Result::Err(_) => { let cur = P::into_buffer(p).reset(); return self.supa.call(cur, req_ctxt).await; } }; let res = self.handle_method(idx, &mut p, req_ctxt, seqid).await?; p.read_message_end()?; Ok(res) } fn create_interaction( &self, name: &str, ) -> ::anyhow::Result< ::std::sync::Arc<dyn ::fbthrift::ThriftService<P::Frame, Handler = (), RequestContext = R> + ::std::marker::Send + 'static> > { use ::fbthrift::{ServiceProcessor as _}; let idx = self.create_interaction_idx(name); let idx = match idx { ::anyhow::Result::Ok(idx) => idx, ::anyhow::Result::Err(_) => { return self.supa.create_interaction(name); } }; self.handle_create_interaction(idx) } } {{^service:interaction?}} /// Construct a new instance of a {{service:name}} service. /// /// This is called when a new instance of a Thrift service Processor /// is needed for a particular Thrift protocol. #[::tracing::instrument(level="debug", skip_all, fields(proto = ?proto))] pub fn make_{{service:name}}_server<F, H, R{{! }}{{#service:extends?}}, SMAKE, SS{{/service:extends?}}{{! }}>( proto: ::fbthrift::ProtocolID, handler: H,{{! }}{{#service:extends?}} supa: SMAKE,{{! }}{{/service:extends?}} ) -> ::std::result::Result<::std::boxed::Box<dyn ::fbthrift::ThriftService<F, Handler = H, RequestContext = R> + ::std::marker::Send + 'static>, ::fbthrift::ApplicationException> where F: ::fbthrift::Framing + ::std::marker::Send + ::std::marker::Sync + 'static, H: {{service:name}}{{! }}{{#service:requestContext?}}<RequestContext = R>{{/service:requestContext?}},{{! }}{{#service:extends?}} SMAKE: ::std::ops::FnOnce(::fbthrift::ProtocolID) -> ::std::result::Result<SS, ::fbthrift::ApplicationException>, SS: ::fbthrift::ThriftService<F, RequestContext = R>, SS::Handler: {{>lib/super}},{{! }}{{/service:extends?}} R: ::fbthrift::RequestContext<Name = ::std::ffi::CStr> + ::std::marker::Send + ::std::marker::Sync + 'static, <R as ::fbthrift::RequestContext>::ContextStack: ::fbthrift::ContextStack<Name = R::Name, Buffer = F::DecBuf> + ::std::marker::Send + ::std::marker::Sync + 'static { match proto { ::fbthrift::ProtocolID::BinaryProtocol => { ::std::result::Result::Ok(::std::boxed::Box::new({{service:name}}Processor::<::fbthrift::BinaryProtocol<F>, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}>::new(handler{{! }}{{#service:extends?}}, supa(proto)?{{/service:extends?}}{{! }}))) } ::fbthrift::ProtocolID::CompactProtocol => { ::std::result::Result::Ok(::std::boxed::Box::new({{service:name}}Processor::<::fbthrift::CompactProtocol<F>, H, R{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}>::new(handler{{! }}{{#service:extends?}}, supa(proto)?{{/service:extends?}}{{! }}))) } bad => { ::tracing::error!(method = "{{service:name}}.{{function:name}}", invalid_protocol = ?bad); ::std::result::Result::Err(::fbthrift::ApplicationException::invalid_protocol(bad)) } } } {{/service:interaction?}} {{!newline}}