core/sdk/src/clients/producer.rs (675 lines of code) (raw):
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::client::Client;
use crate::compression::compression_algorithm::CompressionAlgorithm;
use crate::diagnostic::DiagnosticEvent;
use crate::error::IggyError;
use crate::identifier::{IdKind, Identifier};
use crate::locking::{IggySharedMut, IggySharedMutFn};
use crate::partitioner::Partitioner;
use crate::prelude::IggyMessage;
use crate::prelude::Partitioning;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::timestamp::IggyTimestamp;
use crate::utils::topic_size::MaxTopicSize;
use bytes::Bytes;
use futures_util::StreamExt;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, Interval};
use tracing::{error, info, trace, warn};
const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst;
const MAX_BATCH_SIZE: usize = 1000000;
unsafe impl Send for IggyProducer {}
unsafe impl Sync for IggyProducer {}
pub struct IggyProducer {
initialized: bool,
can_send: Arc<AtomicBool>,
client: Arc<IggySharedMut<Box<dyn Client>>>,
stream_id: Arc<Identifier>,
stream_name: String,
topic_id: Arc<Identifier>,
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Arc<Partitioning>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
send_interval_micros: u64,
create_stream_if_not_exists: bool,
create_topic_if_not_exists: bool,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
default_partitioning: Arc<Partitioning>,
can_send_immediately: bool,
last_sent_at: Arc<AtomicU64>,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
}
impl IggyProducer {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: IggySharedMut<Box<dyn Client>>,
stream: Identifier,
stream_name: String,
topic: Identifier,
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Partitioning>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
interval: Option<IggyDuration>,
create_stream_if_not_exists: bool,
create_topic_if_not_exists: bool,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
) -> Self {
Self {
initialized: false,
client: Arc::new(client),
can_send: Arc::new(AtomicBool::new(true)),
stream_id: Arc::new(stream),
stream_name,
topic_id: Arc::new(topic),
topic_name,
batch_size,
partitioning: partitioning.map(Arc::new),
encryptor,
partitioner,
send_interval_micros: interval.map_or(0, |i| i.as_micros()),
create_stream_if_not_exists,
create_topic_if_not_exists,
topic_partitions_count,
topic_replication_factor,
topic_message_expiry,
topic_max_size,
default_partitioning: Arc::new(Partitioning::balanced()),
can_send_immediately: interval.is_none(),
last_sent_at: Arc::new(AtomicU64::new(0)),
send_retries_count,
send_retries_interval,
}
}
pub fn stream(&self) -> &Identifier {
&self.stream_id
}
pub fn topic(&self) -> &Identifier {
&self.topic_id
}
/// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc.
///
/// Note: This method must be invoked before producing messages.
pub async fn init(&mut self) -> Result<(), IggyError> {
if self.initialized {
return Ok(());
}
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
info!("Initializing producer for stream: {stream_id} and topic: {topic_id}...");
self.subscribe_events().await;
let client = self.client.clone();
let client = client.read().await;
if client.get_stream(&stream_id).await?.is_none() {
if !self.create_stream_if_not_exists {
error!("Stream does not exist and auto-creation is disabled.");
return Err(IggyError::StreamNameNotFound(self.stream_name.clone()));
}
let (name, id) = match stream_id.kind {
IdKind::Numeric => (
self.stream_name.to_owned(),
Some(self.stream_id.get_u32_value()?),
),
IdKind::String => (self.stream_id.get_string_value()?, None),
};
info!("Creating stream: {name}");
client.create_stream(&name, id).await?;
}
if client.get_topic(&stream_id, &topic_id).await?.is_none() {
if !self.create_topic_if_not_exists {
error!("Topic does not exist and auto-creation is disabled.");
return Err(IggyError::TopicNameNotFound(
self.topic_name.clone(),
self.stream_name.clone(),
));
}
let (name, id) = match self.topic_id.kind {
IdKind::Numeric => (
self.topic_name.to_owned(),
Some(self.topic_id.get_u32_value()?),
),
IdKind::String => (self.topic_id.get_string_value()?, None),
};
info!("Creating topic: {name} for stream: {}", self.stream_name);
client
.create_topic(
&self.stream_id,
&self.topic_name,
self.topic_partitions_count,
CompressionAlgorithm::None,
self.topic_replication_factor,
id,
self.topic_message_expiry,
self.topic_max_size,
)
.await?;
}
self.initialized = true;
info!("Producer has been initialized for stream: {stream_id} and topic: {topic_id}.");
Ok(())
}
async fn subscribe_events(&self) {
trace!("Subscribing to diagnostic events");
let mut receiver;
{
let client = self.client.read().await;
receiver = client.subscribe_events().await;
}
let can_send = self.can_send.clone();
tokio::spawn(async move {
while let Some(event) = receiver.next().await {
trace!("Received diagnostic event: {event}");
match event {
DiagnosticEvent::Shutdown => {
can_send.store(false, ORDERING);
warn!("Client has been shutdown");
}
DiagnosticEvent::Connected => {
can_send.store(false, ORDERING);
trace!("Connected to the server");
}
DiagnosticEvent::Disconnected => {
can_send.store(false, ORDERING);
warn!("Disconnected from the server");
}
DiagnosticEvent::SignedIn => {
can_send.store(true, ORDERING);
}
DiagnosticEvent::SignedOut => {
can_send.store(false, ORDERING);
}
}
}
});
}
pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> {
if messages.is_empty() {
trace!("No messages to send.");
return Ok(());
}
if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages, None)
.await;
}
self.send_buffered(
self.stream_id.clone(),
self.topic_id.clone(),
messages,
None,
)
.await
}
pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> {
self.send(vec![message]).await
}
pub async fn send_with_partitioning(
&self,
messages: Vec<IggyMessage>,
partitioning: Option<Arc<Partitioning>>,
) -> Result<(), IggyError> {
if messages.is_empty() {
trace!("No messages to send.");
return Ok(());
}
if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages, partitioning)
.await;
}
self.send_buffered(
self.stream_id.clone(),
self.topic_id.clone(),
messages,
partitioning,
)
.await
}
pub async fn send_to(
&self,
stream: Arc<Identifier>,
topic: Arc<Identifier>,
messages: Vec<IggyMessage>,
partitioning: Option<Arc<Partitioning>>,
) -> Result<(), IggyError> {
if messages.is_empty() {
trace!("No messages to send.");
return Ok(());
}
if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages, partitioning)
.await;
}
self.send_buffered(stream, topic, messages, partitioning)
.await
}
async fn send_buffered(
&self,
stream: Arc<Identifier>,
topic: Arc<Identifier>,
mut messages: Vec<IggyMessage>,
partitioning: Option<Arc<Partitioning>>,
) -> Result<(), IggyError> {
self.encrypt_messages(&mut messages)?;
let partitioning = self.get_partitioning(&stream, &topic, &messages, partitioning)?;
let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE);
let batches = messages.chunks_mut(batch_size);
let mut current_batch = 1;
let batches_count = batches.len();
for batch in batches {
if self.send_interval_micros > 0 {
Self::wait_before_sending(
self.send_interval_micros,
self.last_sent_at.load(ORDERING),
)
.await;
}
let messages_count = batch.len();
trace!(
"Sending {messages_count} messages ({current_batch}/{batches_count} batch(es))..."
);
self.last_sent_at
.store(IggyTimestamp::now().into(), ORDERING);
self.try_send_messages(&self.stream_id, &self.topic_id, &partitioning, batch)
.await?;
trace!("Sent {messages_count} messages ({current_batch}/{batches_count} batch(es)).");
current_batch += 1;
}
Ok(())
}
async fn send_immediately(
&self,
stream: &Identifier,
topic: &Identifier,
mut messages: Vec<IggyMessage>,
partitioning: Option<Arc<Partitioning>>,
) -> Result<(), IggyError> {
trace!("No batch size specified, sending messages immediately.");
self.encrypt_messages(&mut messages)?;
let partitioning = self.get_partitioning(stream, topic, &messages, partitioning)?;
let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE);
if messages.len() <= batch_size {
self.last_sent_at
.store(IggyTimestamp::now().into(), ORDERING);
self.try_send_messages(stream, topic, &partitioning, &mut messages)
.await?;
return Ok(());
}
for batch in messages.chunks_mut(batch_size) {
self.last_sent_at
.store(IggyTimestamp::now().into(), ORDERING);
self.try_send_messages(stream, topic, &partitioning, batch)
.await?;
}
Ok(())
}
async fn wait_before_sending(interval: u64, last_sent_at: u64) {
if interval == 0 {
return;
}
let now: u64 = IggyTimestamp::now().into();
let elapsed = now - last_sent_at;
if elapsed >= interval {
trace!("No need to wait before sending messages. {now} - {last_sent_at} = {elapsed}");
return;
}
let remaining = interval - elapsed;
trace!("Waiting for {remaining} microseconds before sending messages... {interval} - {elapsed} = {remaining}");
sleep(Duration::from_micros(remaining)).await;
}
fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> {
if let Some(encryptor) = &self.encryptor {
for message in messages {
message.payload = Bytes::from(encryptor.encrypt(&message.payload)?);
message.header.payload_length = message.payload.len() as u32;
}
}
Ok(())
}
async fn try_send_messages(
&self,
stream: &Identifier,
topic: &Identifier,
partitioning: &Arc<Partitioning>,
messages: &mut [IggyMessage],
) -> Result<(), IggyError> {
let client = self.client.read().await;
let Some(max_retries) = self.send_retries_count else {
return client
.send_messages(stream, topic, partitioning, messages)
.await;
};
if max_retries == 0 {
return client
.send_messages(stream, topic, partitioning, messages)
.await;
}
let mut timer = if let Some(interval) = self.send_retries_interval {
let mut timer = tokio::time::interval(interval.get_duration());
timer.tick().await;
Some(timer)
} else {
None
};
self.wait_until_connected(max_retries, stream, topic, &mut timer)
.await?;
self.send_with_retries(
max_retries,
stream,
topic,
partitioning,
messages,
&mut timer,
)
.await
}
async fn wait_until_connected(
&self,
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
timer: &mut Option<Interval>,
) -> Result<(), IggyError> {
let mut retries = 0;
while !self.can_send.load(ORDERING) {
retries += 1;
if retries > max_retries {
error!(
"Failed to send messages to topic: {topic}, stream: {stream} \
after {max_retries} retries. Client is disconnected."
);
return Err(IggyError::CannotSendMessagesDueToClientDisconnection);
}
error!(
"Trying to send messages to topic: {topic}, stream: {stream} \
but the client is disconnected. Retrying {retries}/{max_retries}..."
);
if let Some(timer) = timer.as_mut() {
trace!(
"Waiting for the next retry to send messages to topic: {topic}, \
stream: {stream} for disconnected client..."
);
timer.tick().await;
}
}
Ok(())
}
async fn send_with_retries(
&self,
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
partitioning: &Arc<Partitioning>,
messages: &mut [IggyMessage],
timer: &mut Option<Interval>,
) -> Result<(), IggyError> {
let client = self.client.read().await;
let mut retries = 0;
loop {
match client
.send_messages(stream, topic, partitioning, messages)
.await
{
Ok(_) => return Ok(()),
Err(error) => {
retries += 1;
if retries > max_retries {
error!(
"Failed to send messages to topic: {topic}, stream: {stream} \
after {max_retries} retries. {error}."
);
return Err(error);
}
error!(
"Failed to send messages to topic: {topic}, stream: {stream}. \
{error} Retrying {retries}/{max_retries}..."
);
if let Some(t) = timer.as_mut() {
trace!(
"Waiting for the next retry to send messages to topic: {topic}, \
stream: {stream}..."
);
t.tick().await;
}
}
}
}
}
fn get_partitioning(
&self,
stream: &Identifier,
topic: &Identifier,
messages: &[IggyMessage],
partitioning: Option<Arc<Partitioning>>,
) -> Result<Arc<Partitioning>, IggyError> {
if let Some(partitioner) = &self.partitioner {
trace!("Calculating partition id using custom partitioner.");
let partition_id = partitioner.calculate_partition_id(stream, topic, messages)?;
Ok(Arc::new(Partitioning::partition_id(partition_id)))
} else {
trace!("Using the provided partitioning.");
Ok(partitioning.unwrap_or_else(|| {
self.partitioning
.clone()
.unwrap_or_else(|| self.default_partitioning.clone())
}))
}
}
}
#[derive(Debug)]
pub struct IggyProducerBuilder {
client: IggySharedMut<Box<dyn Client>>,
stream: Identifier,
stream_name: String,
topic: Identifier,
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Partitioning>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
send_interval: Option<IggyDuration>,
create_stream_if_not_exists: bool,
create_topic_if_not_exists: bool,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
}
impl IggyProducerBuilder {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: IggySharedMut<Box<dyn Client>>,
stream: Identifier,
stream_name: String,
topic: Identifier,
topic_name: String,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
) -> Self {
Self {
client,
stream,
stream_name,
topic,
topic_name,
batch_size: Some(1000),
partitioning: None,
encryptor,
partitioner,
send_interval: Some(IggyDuration::from(1000)),
create_stream_if_not_exists: true,
create_topic_if_not_exists: true,
topic_partitions_count: 1,
topic_replication_factor: None,
topic_message_expiry: IggyExpiry::ServerDefault,
topic_max_size: MaxTopicSize::ServerDefault,
send_retries_count: Some(3),
send_retries_interval: Some(IggyDuration::ONE_SECOND),
}
}
/// Sets the stream identifier.
pub fn stream(self, stream: Identifier) -> Self {
Self { stream, ..self }
}
/// Sets the stream name.
pub fn topic(self, topic: Identifier) -> Self {
Self { topic, ..self }
}
/// Sets the number of messages to batch before sending them, can be combined with `interval`.
pub fn batch_size(self, batch_size: u32) -> Self {
Self {
batch_size: if batch_size == 0 {
None
} else {
Some(batch_size.min(MAX_BATCH_SIZE as u32) as usize)
},
..self
}
}
/// Clears the batch size.
pub fn without_batch_size(self) -> Self {
Self {
batch_size: None,
..self
}
}
/// Sets the interval between sending the messages, can be combined with `batch_size`.
pub fn send_interval(self, interval: IggyDuration) -> Self {
Self {
send_interval: Some(interval),
..self
}
}
/// Clears the interval.
pub fn without_send_interval(self) -> Self {
Self {
send_interval: None,
..self
}
}
/// Sets the encryptor for encrypting the messages' payloads.
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
encryptor: Some(encryptor),
..self
}
}
/// Clears the encryptor for encrypting the messages' payloads.
pub fn without_encryptor(self) -> Self {
Self {
encryptor: None,
..self
}
}
/// Sets the partitioning strategy for messages.
pub fn partitioning(self, partitioning: Partitioning) -> Self {
Self {
partitioning: Some(partitioning),
..self
}
}
/// Clears the partitioning strategy.
pub fn without_partitioning(self) -> Self {
Self {
partitioning: None,
..self
}
}
/// Sets the partitioner for messages.
pub fn partitioner(self, partitioner: Arc<dyn Partitioner>) -> Self {
Self {
partitioner: Some(partitioner),
..self
}
}
/// Clears the partitioner.
pub fn without_partitioner(self) -> Self {
Self {
partitioner: None,
..self
}
}
/// Creates the stream if it does not exist - requires user to have the necessary permissions.
pub fn create_stream_if_not_exists(self) -> Self {
Self {
create_stream_if_not_exists: true,
..self
}
}
/// Does not create the stream if it does not exist.
pub fn do_not_create_stream_if_not_exists(self) -> Self {
Self {
create_stream_if_not_exists: false,
..self
}
}
/// Creates the topic if it does not exist - requires user to have the necessary permissions.
pub fn create_topic_if_not_exists(
self,
partitions_count: u32,
replication_factor: Option<u8>,
message_expiry: IggyExpiry,
max_size: MaxTopicSize,
) -> Self {
Self {
create_topic_if_not_exists: true,
topic_partitions_count: partitions_count,
topic_replication_factor: replication_factor,
topic_message_expiry: message_expiry,
topic_max_size: max_size,
..self
}
}
/// Does not create the topic if it does not exist.
pub fn do_not_create_topic_if_not_exists(self) -> Self {
Self {
create_topic_if_not_exists: false,
..self
}
}
/// Sets the retry policy (maximum number of retries and interval between them) in case of messages sending failure.
/// The error can be related either to disconnecting from the server or to the server rejecting the messages.
/// Default is 3 retries with 1 second interval between them.
pub fn send_retries(self, retries: Option<u32>, interval: Option<IggyDuration>) -> Self {
Self {
send_retries_count: retries,
send_retries_interval: interval,
..self
}
}
/// Builds the producer.
///
/// Note: After building the producer, `init()` must be invoked before producing messages.
pub fn build(self) -> IggyProducer {
IggyProducer::new(
self.client,
self.stream,
self.stream_name,
self.topic,
self.topic_name,
self.batch_size,
self.partitioning,
self.encryptor,
self.partitioner,
self.send_interval,
self.create_stream_if_not_exists,
self.create_topic_if_not_exists,
self.topic_partitions_count,
self.topic_replication_factor,
self.topic_message_expiry,
self.topic_max_size,
self.send_retries_count,
self.send_retries_interval,
)
}
}