core/bench/src/actors/producing_consumer.rs (363 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ use crate::analytics::record::BenchmarkRecord; use crate::benchmarks::common::create_consumer; use crate::utils::batch_generator::BenchmarkBatchGenerator; use crate::utils::finish_condition::BenchmarkFinishCondition; use crate::utils::{batch_total_size_bytes, batch_user_size_bytes}; use crate::{ analytics::metrics::individual::from_records, utils::rate_limiter::BenchmarkRateLimiter, }; use human_repr::HumanCount; use iggy::prelude::*; use iggy_bench_report::actor_kind::ActorKind; use iggy_bench_report::benchmark_kind::BenchmarkKind; use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use iggy_bench_report::numeric_parameter::BenchmarkNumericParameter; use integration::test_server::{login_root, ClientFactory}; use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; use tracing::{info, warn}; pub struct BenchmarkProducingConsumer { client_factory: Arc<dyn ClientFactory>, benchmark_kind: BenchmarkKind, actor_id: u32, consumer_group_id: Option<u32>, stream_id: u32, partitions_count: u32, messages_per_batch: BenchmarkNumericParameter, message_size: BenchmarkNumericParameter, send_finish_condition: Arc<BenchmarkFinishCondition>, poll_finish_condition: Arc<BenchmarkFinishCondition>, warmup_time: IggyDuration, sampling_time: IggyDuration, moving_average_window: u32, limit_bytes_per_second: Option<IggyByteSize>, polling_kind: PollingKind, } impl BenchmarkProducingConsumer { #[allow(clippy::too_many_arguments)] pub fn new( client_factory: Arc<dyn ClientFactory>, benchmark_kind: BenchmarkKind, actor_id: u32, consumer_group_id: Option<u32>, stream_id: u32, partitions_count: u32, messages_per_batch: BenchmarkNumericParameter, message_size: BenchmarkNumericParameter, send_finish_condition: Arc<BenchmarkFinishCondition>, poll_finish_condition: Arc<BenchmarkFinishCondition>, warmup_time: IggyDuration, sampling_time: IggyDuration, moving_average_window: u32, limit_bytes_per_second: Option<IggyByteSize>, polling_kind: PollingKind, ) -> Self { Self { client_factory, benchmark_kind, actor_id, consumer_group_id, stream_id, partitions_count, messages_per_batch, message_size, send_finish_condition, poll_finish_condition, warmup_time, sampling_time, moving_average_window, limit_bytes_per_second, polling_kind, } } pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> { let topic_id: u32 = 1; let default_partition_id: u32 = 1; let client = self.client_factory.create_client().await; let client = IggyClient::create(client, None, None); login_root(&client).await; let stream_id = self.stream_id.try_into()?; let topic_id = topic_id.try_into()?; let partitioning = match self.partitions_count { 0 => panic!("Partition count must be greater than 0"), 1 => Partitioning::partition_id(default_partition_id), 2.. => Partitioning::balanced(), }; let partition_id = if self.consumer_group_id.is_some() { None } else { Some(default_partition_id) }; let consumer = create_consumer( &client, &self.consumer_group_id, &stream_id, &topic_id, self.actor_id, ) .await; let mut batch_generator = BenchmarkBatchGenerator::new(self.message_size, self.messages_per_batch); let rate_limiter = self.limit_bytes_per_second.map(BenchmarkRateLimiter::new); // ----------------------- // WARM-UP // ----------------------- if self.warmup_time.get_duration() != Duration::from_millis(0) { let warmup_end = Instant::now() + self.warmup_time.get_duration(); let mut offset: u64 = 0; let mut last_warning_time: Option<Instant> = None; let mut skipped_warnings_count: u32 = 0; if let Some(cg_id) = self.consumer_group_id { info!( "ProducingConsumer #{}, part of consumer group #{}, → warming up for {}...", self.actor_id, cg_id, self.warmup_time ); } else { info!( "ProducingConsumer #{} → warming up for {}...", self.actor_id, self.warmup_time ); } while Instant::now() < warmup_end { let batch = batch_generator.generate_batch(); client .send_messages(&stream_id, &topic_id, &partitioning, &mut batch.messages) .await?; let (strategy, auto_commit) = match self.polling_kind { PollingKind::Offset => (PollingStrategy::offset(offset), false), PollingKind::Next => (PollingStrategy::next(), true), other => panic!("Unsupported polling kind for warmup: {:?}", other), }; let polled_messages = client .poll_messages( &stream_id, &topic_id, partition_id, &consumer, &strategy, batch.messages.len() as u32, auto_commit, ) .await?; if polled_messages.messages.is_empty() { let should_warn = last_warning_time .map(|t| t.elapsed() >= Duration::from_secs(1)) .unwrap_or(true); if should_warn { warn!( "ProducingConsumer #{} (warmup) → expected {} messages but got {}, retrying... ({} warnings skipped)", self.actor_id, self.messages_per_batch, polled_messages.messages.len(), skipped_warnings_count ); last_warning_time = Some(Instant::now()); skipped_warnings_count = 0; } else { skipped_warnings_count += 1; } continue; } offset += batch.messages.len() as u64; } } // -------------------------------- // MAIN BENCHMARK LOOP // -------------------------------- info!( "ProducingConsumer #{} → sending {} and polling {} ({} msgs/batch) on stream {}, rate limit: {:?}", self.actor_id, self.send_finish_condition.total_str(), self.poll_finish_condition.total_str(), self.messages_per_batch, stream_id, self.limit_bytes_per_second ); let max_capacity = self .send_finish_condition .max_capacity() .max(self.poll_finish_condition.max_capacity()); let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity); let mut records: Vec<BenchmarkRecord> = Vec::with_capacity(max_capacity); let mut offset = 0; let mut last_warning_time: Option<Instant> = None; let mut skipped_warnings_count = 0; let mut total_user_data_bytes_processed; let mut total_bytes_processed; let mut total_messages_processed = 0; let mut total_batches_processed = 0; let mut sent_user_bytes_processed = 0; let mut sent_total_bytes_processed = 0; let mut sent_messages = 0; let mut sent_batches = 0; let mut received_user_bytes_processed = 0; let mut received_total_bytes_processed = 0; let mut received_messages = 0; let mut received_batches = 0; let mut last_received_batch_user_data_bytes; let mut rl_value = 0; let is_producer = self.send_finish_condition.total() > 0; let is_consumer = self.poll_finish_condition.total() > 0; let require_reply = is_producer && is_consumer && self.consumer_group_id.is_none(); let mut awaiting_reply = false; // meaningful only if require_reply == true let start_timestamp = Instant::now(); while !(self.send_finish_condition.is_done() && self.poll_finish_condition.is_done()) { let may_send = is_producer && !self.send_finish_condition.is_done() && (!require_reply || !awaiting_reply); if may_send { let batch = batch_generator.generate_batch(); client .send_messages(&stream_id, &topic_id, &partitioning, &mut batch.messages) .await?; rl_value += batch.user_data_bytes; sent_total_bytes_processed += batch.total_bytes; sent_user_bytes_processed += batch.user_data_bytes; sent_messages += batch.messages.len() as u32; sent_batches += 1; awaiting_reply = is_consumer; if self .send_finish_condition .account_and_check(batch.user_data_bytes) { info!( "ProducingConsumer #{} → finished sending {} messages in {} batches ({} bytes of user data, {} bytes of total data), send finish condition: {}, poll finish condition: {}", self.actor_id, sent_messages.human_count_bare(), sent_batches.human_count_bare(), sent_user_bytes_processed.human_count_bytes(), sent_total_bytes_processed.human_count_bytes(), self.send_finish_condition.status(), self.poll_finish_condition.status() ); } } if is_consumer && !self.poll_finish_condition.is_done() { let (strategy, auto_commit) = match self.polling_kind { PollingKind::Offset => (PollingStrategy::offset(offset), false), PollingKind::Next => (PollingStrategy::next(), true), other => panic!("Unsupported polling kind for benchmark: {:?}", other), }; let polled_messages = client .poll_messages( &stream_id, &topic_id, partition_id, &consumer, &strategy, self.messages_per_batch.max(), auto_commit, ) .await?; if polled_messages.messages.is_empty() { let should_warn = last_warning_time .map(|t| t.elapsed() >= Duration::from_secs(1)) .unwrap_or(true); if should_warn { warn!( "ProducingConsumer #{} → received empty batch, sent: {}, polled: {}, polling kind: {:?}, retrying... ({} warnings skipped in last second)", self.actor_id, self.send_finish_condition.status(), self.poll_finish_condition.status(), self.polling_kind, skipped_warnings_count ); last_warning_time = Some(Instant::now()); skipped_warnings_count = 0; } else { skipped_warnings_count += 1; } continue; } let now = IggyTimestamp::now().as_micros(); let latency = Duration::from_micros( now - polled_messages.messages[0].header.origin_timestamp, ); latencies.push(latency); last_received_batch_user_data_bytes = batch_user_size_bytes(&polled_messages); rl_value += last_received_batch_user_data_bytes; received_user_bytes_processed += last_received_batch_user_data_bytes; received_total_bytes_processed += batch_total_size_bytes(&polled_messages); received_messages += polled_messages.messages.len() as u32; received_batches += 1; offset += polled_messages.messages.len() as u64; total_user_data_bytes_processed = received_user_bytes_processed + sent_user_bytes_processed; total_bytes_processed = received_total_bytes_processed + sent_total_bytes_processed; total_messages_processed = received_messages + sent_messages; total_batches_processed = received_batches + sent_batches; records.push(BenchmarkRecord { elapsed_time_us: start_timestamp.elapsed().as_micros() as u64, latency_us: latency.as_micros() as u64, messages: total_messages_processed as u64, message_batches: total_batches_processed, user_data_bytes: total_user_data_bytes_processed, total_bytes: total_bytes_processed, }); if let Some(rate_limiter) = &rate_limiter { rate_limiter.wait_until_necessary(rl_value).await; rl_value = 0; } self.poll_finish_condition .account_and_check(last_received_batch_user_data_bytes); if require_reply { awaiting_reply = false; } } } let metrics = from_records( records, self.benchmark_kind, ActorKind::ProducingConsumer, self.actor_id, self.sampling_time, self.moving_average_window, ); Self::log_statistics( self.actor_id, total_messages_processed as u64, total_batches_processed, &self.messages_per_batch, &metrics, ); Ok(metrics) } fn log_statistics( actor_id: u32, total_messages: u64, total_batches: u64, messages_per_batch: &BenchmarkNumericParameter, metrics: &BenchmarkIndividualMetrics, ) { info!( "ProducingConsumer #{} → sent and received {} messages in {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \ p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, p9999 latency: {:.2} ms, \ average latency: {:.2} ms, median latency: {:.2} ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms", actor_id, total_messages.human_count_bare(), total_batches.human_count_bare(), messages_per_batch, metrics.summary.total_time_secs, IggyByteSize::from(metrics.summary.total_user_data_bytes), metrics.summary.throughput_megabytes_per_second, metrics.summary.p50_latency_ms, metrics.summary.p90_latency_ms, metrics.summary.p95_latency_ms, metrics.summary.p99_latency_ms, metrics.summary.p999_latency_ms, metrics.summary.p9999_latency_ms, metrics.summary.avg_latency_ms, metrics.summary.median_latency_ms, metrics.summary.min_latency_ms, metrics.summary.max_latency_ms, metrics.summary.std_dev_latency_ms, ); } }