remoting/base/src/codec.rs (107 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ use std::sync::Arc; use anyhow::{anyhow, Error}; use bytes::{self, Bytes}; use dashmap::DashMap; use protocol_base::ProtocolName; use crate::{error::CodecError, Response}; #[derive(Clone)] pub struct BoxedCodec(Arc<dyn Codec>); impl BoxedCodec { pub fn new(codec: Arc<dyn Codec>) -> Self { BoxedCodec(codec) } } pub trait Codec: Sync + Send { fn encode_request(&self) -> Result<Bytes, CodecError>; fn encode_response(&self) -> Result<Bytes, CodecError>; fn decode(&self, bytes: Bytes) -> Result<CodecResult, CodecError>; } pub struct CodecRegistry { registry: DashMap<ProtocolName, BoxedCodec>, } #[derive(Default)] pub struct CodecResult { is_request: bool, // heartbeat flag result: Option<Response>, } impl Default for CodecRegistry { fn default() -> Self { CodecRegistry { registry: DashMap::new(), } } } impl CodecRegistry { pub fn get_codec(&self, protocol: ProtocolName) -> Option<BoxedCodec> { let registry_map = &self.registry; if let true = registry_map.contains_key(protocol) { let option = registry_map.get(protocol); let codec = option.as_deref().unwrap(); Some(codec.clone()) } else { None } } pub fn set_codec( &mut self, protocol: ProtocolName, codec: BoxedCodec, ) -> anyhow::Result<(), CodecError> { if let true = self.registry.contains_key(protocol) { return Err(CodecError::RegistryExistsProtocol(protocol)); } else { self.registry.insert(protocol, codec); } Ok(()) } pub fn is_registered(&self, protocol: ProtocolName) -> bool { self.registry.contains_key(protocol) } } #[cfg(test)] mod tests { use std::sync::Arc; use bytes::Bytes; use crate::{ codec::{BoxedCodec, CodecRegistry, CodecResult}, error::CodecError, Codec, }; #[derive(Default)] struct TestCodec; impl Codec for TestCodec { fn encode_request(&self) -> Result<Bytes, CodecError> { Ok(Bytes::new()) } fn encode_response(&self) -> Result<Bytes, CodecError> { Ok(Bytes::new()) } fn decode(&self, bytes: Bytes) -> Result<CodecResult, CodecError> { Ok(CodecResult::default()) } } #[test] fn test_registry() { let mut codec_registry = CodecRegistry::default(); codec_registry .set_codec("test", BoxedCodec(Arc::new(TestCodec::default()))) .unwrap(); assert!(codec_registry.is_registered("test")); } }