rust/src/util.rs (53 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ use std::hash::Hasher; use std::sync::atomic::Ordering; use std::sync::Arc; use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption}; use siphasher::sip::SipHasher24; use crate::error::{ClientError, ErrorKind}; use crate::model::common::{Endpoints, Route}; use crate::pb::settings::PubSub; use crate::pb::telemetry_command::Command; use crate::pb::{ Language, MessageQueue, Publishing, Resource, Settings, Subscription, TelemetryCommand, Ua, }; pub(crate) static SDK_LANGUAGE: Language = Language::Rust; pub(crate) static SDK_VERSION: &str = "5.0.0"; pub(crate) static PROTOCOL_VERSION: &str = "2.0.0"; lazy_static::lazy_static! { pub(crate) static ref HOST_NAME: String = match hostname::get() { Ok(name) => name.to_str().unwrap_or("localhost").to_string(), Err(_) => "localhost".to_string(), }; } pub(crate) fn select_message_queue(route: Arc<Route>) -> MessageQueue { let i = route.index.fetch_add(1, Ordering::Relaxed); route.queue[i % route.queue.len()].clone() } pub(crate) fn select_message_queue_by_message_group( route: Arc<Route>, message_group: String, ) -> MessageQueue { let mut sip_hasher24 = SipHasher24::default(); sip_hasher24.write(message_group.as_bytes()); let index = sip_hasher24.finish() % route.queue.len() as u64; route.queue[index as usize].clone() } pub(crate) fn build_endpoints_by_message_queue( message_queue: &MessageQueue, operation: &'static str, ) -> Result<Endpoints, ClientError> { let topic = message_queue.topic.clone().unwrap().name; if message_queue.broker.is_none() { return Err(ClientError::new( ErrorKind::NoBrokerAvailable, "message queue do not have a available endpoint", operation, ) .with_context("topic", topic) .with_context("queue_id", message_queue.id.to_string())); } let broker = message_queue.broker.clone().unwrap(); if broker.endpoints.is_none() { return Err(ClientError::new( ErrorKind::NoBrokerAvailable, "message queue do not have a available endpoint", operation, ) .with_context("broker", broker.name) .with_context("topic", topic) .with_context("queue_id", message_queue.id.to_string())); } Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap())) } pub(crate) fn build_producer_settings( option: &ProducerOption, client_options: &ClientOption, ) -> TelemetryCommand { let topics = option .topics() .clone() .unwrap_or(vec![]) .iter() .map(|topic| Resource { name: topic.to_string(), resource_namespace: option.namespace().to_string(), }) .collect(); let platform = os_info::get(); TelemetryCommand { command: Some(Command::Settings(Settings { client_type: Some(client_options.client_type.clone() as i32), request_timeout: Some(prost_types::Duration { seconds: client_options.timeout().as_secs() as i64, nanos: client_options.timeout().subsec_nanos() as i32, }), pub_sub: Some(PubSub::Publishing(Publishing { topics, validate_message_type: option.validate_message_type(), ..Publishing::default() })), user_agent: Some(Ua { language: SDK_LANGUAGE as i32, version: SDK_VERSION.to_string(), platform: format!("{} {}", platform.os_type(), platform.version()), hostname: HOST_NAME.clone(), }), ..Settings::default() })), ..TelemetryCommand::default() } } pub(crate) fn build_simple_consumer_settings( option: &SimpleConsumerOption, client_option: &ClientOption, ) -> TelemetryCommand { let platform = os_info::get(); TelemetryCommand { command: Some(Command::Settings(Settings { client_type: Some(client_option.client_type.clone() as i32), request_timeout: Some(prost_types::Duration { seconds: client_option.timeout().as_secs() as i64, nanos: client_option.timeout().subsec_nanos() as i32, }), pub_sub: Some(PubSub::Subscription(Subscription { group: Some(Resource { name: option.consumer_group().to_string(), resource_namespace: option.namespace().to_string(), }), subscriptions: vec![], fifo: Some(false), receive_batch_size: None, long_polling_timeout: Some(prost_types::Duration { seconds: client_option.long_polling_timeout().as_secs() as i64, nanos: client_option.long_polling_timeout().subsec_nanos() as i32, }), })), user_agent: Some(Ua { language: SDK_LANGUAGE as i32, version: SDK_VERSION.to_string(), platform: format!("{} {}", platform.os_type(), platform.version()), hostname: HOST_NAME.clone(), }), ..Settings::default() })), ..TelemetryCommand::default() } } #[cfg(test)] mod tests { use crate::model::common::Route; use crate::pb; use crate::pb::{Broker, MessageQueue}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use super::*; fn build_route() -> Arc<Route> { let message_queue_1 = MessageQueue { topic: None, id: 1, permission: 0, broker: None, accept_message_types: vec![], }; let message_queue_2 = MessageQueue { topic: None, id: 2, permission: 0, broker: None, accept_message_types: vec![], }; Arc::new(Route { index: AtomicUsize::new(0), queue: vec![message_queue_1, message_queue_2], }) } #[test] fn util_select_message_queue() { let route = build_route(); let message_queue = select_message_queue(route.clone()); assert_eq!(message_queue.id, 1); let message_queue = select_message_queue(route.clone()); assert_eq!(message_queue.id, 2); let message_queue = select_message_queue(route); assert_eq!(message_queue.id, 1); } #[test] fn util_select_message_queue_by_message_group() { let route = build_route(); let message_queue = select_message_queue_by_message_group(route.clone(), "group1".to_string()); assert_eq!(message_queue.id, 1); let message_queue = select_message_queue_by_message_group(route.clone(), "group1".to_string()); assert_eq!(message_queue.id, 1); let message_queue = select_message_queue_by_message_group(route, "another_group".to_string()); assert_eq!(message_queue.id, 2); } #[test] fn util_build_endpoints_by_message_queue() { let mut message_queue = MessageQueue { topic: Some(Resource { name: "topic".to_string(), resource_namespace: "".to_string(), }), id: 1, permission: 0, broker: Some(Broker { name: "".to_string(), id: 0, endpoints: Some(pb::Endpoints { scheme: pb::AddressScheme::DomainName as i32, addresses: vec![], }), }), accept_message_types: vec![], }; let result = build_endpoints_by_message_queue(&message_queue, "test"); assert!(result.is_ok()); assert_eq!(result.unwrap().scheme(), pb::AddressScheme::DomainName); message_queue.broker = Some(Broker { name: "".to_string(), id: 0, endpoints: None, }); let result = build_endpoints_by_message_queue(&message_queue, "test"); assert!(result.is_err()); let error = result.unwrap_err(); assert_eq!(error.kind, ErrorKind::NoBrokerAvailable); assert_eq!(error.operation, "test"); assert_eq!( error.message, "message queue do not have a available endpoint" ); assert_eq!(error.context.len(), 3); message_queue.broker.take(); let result = build_endpoints_by_message_queue(&message_queue, "test"); assert!(result.is_err()); let error = result.unwrap_err(); assert_eq!(error.kind, ErrorKind::NoBrokerAvailable); assert_eq!(error.operation, "test"); assert_eq!( error.message, "message queue do not have a available endpoint" ); assert_eq!(error.context.len(), 2); } #[test] fn util_build_producer_settings() { build_producer_settings(&ProducerOption::default(), &ClientOption::default()); } #[test] fn util_build_simple_consumer_settings() { build_simple_consumer_settings(&SimpleConsumerOption::default(), &ClientOption::default()); } }