in core/sdk/src/clients/consumer.rs [584:725]
fn create_poll_messages_future(
&self,
) -> impl Future<Output = Result<PolledMessages, IggyError>> {
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
let partition_id = self.partition_id;
let consumer = self.consumer.clone();
let polling_strategy = self.polling_strategy;
let client = self.client.clone();
let count = self.batch_size;
let auto_commit_after_polling = self.auto_commit_after_polling;
let auto_commit_enabled = self.auto_commit != AutoCommit::Disabled;
let interval = self.poll_interval_micros;
let last_polled_at = self.last_polled_at.clone();
let can_poll = self.can_poll.clone();
let retry_interval = self.reconnection_retry_interval;
let last_stored_offset = self.last_stored_offsets.clone();
let last_consumed_offset = self.last_consumed_offsets.clone();
let allow_replay = self.allow_replay;
async move {
if interval > 0 {
Self::wait_before_polling(interval, last_polled_at.load(ORDERING)).await;
}
if !can_poll.load(ORDERING) {
trace!("Trying to poll messages in {retry_interval}...");
sleep(retry_interval.get_duration()).await;
}
trace!("Sending poll messages request");
last_polled_at.store(IggyTimestamp::now().into(), ORDERING);
let polled_messages = client
.read()
.await
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&polling_strategy,
count,
auto_commit_after_polling,
)
.await;
if let Ok(mut polled_messages) = polled_messages {
if polled_messages.messages.is_empty() {
return Ok(polled_messages);
}
let partition_id = polled_messages.partition_id;
let consumed_offset;
let has_consumed_offset;
if let Some(offset_entry) = last_consumed_offset.get(&partition_id) {
has_consumed_offset = true;
consumed_offset = offset_entry.load(ORDERING);
} else {
consumed_offset = 0;
has_consumed_offset = false;
last_consumed_offset.insert(partition_id, AtomicU64::new(0));
}
if !allow_replay && has_consumed_offset {
polled_messages
.messages
.retain(|message| message.header.offset > consumed_offset);
if polled_messages.messages.is_empty() {
return Ok(PolledMessages::empty());
}
}
let stored_offset;
if let Some(stored_offset_entry) = last_stored_offset.get(&partition_id) {
if auto_commit_after_polling {
stored_offset_entry.store(consumed_offset, ORDERING);
stored_offset = consumed_offset;
} else {
stored_offset = stored_offset_entry.load(ORDERING);
}
} else {
if auto_commit_after_polling {
stored_offset = consumed_offset;
} else {
stored_offset = 0;
}
last_stored_offset.insert(partition_id, AtomicU64::new(stored_offset));
}
trace!(
"Last consumed offset: {consumed_offset}, current offset: {}, stored offset: {stored_offset}, in partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}, consumer: {consumer}",
polled_messages.current_offset
);
if !allow_replay
&& (has_consumed_offset && polled_messages.current_offset == consumed_offset)
{
trace!("No new messages to consume in partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}, consumer: {consumer}");
if auto_commit_enabled && stored_offset < consumed_offset {
trace!("Auto-committing the offset: {consumed_offset} in partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}, consumer: {consumer}");
client
.read()
.await
.store_consumer_offset(
&consumer,
&stream_id,
&topic_id,
Some(partition_id),
consumed_offset,
)
.await?;
if let Some(stored_offset_entry) = last_stored_offset.get(&partition_id) {
stored_offset_entry.store(consumed_offset, ORDERING);
} else {
last_stored_offset
.insert(partition_id, AtomicU64::new(consumed_offset));
}
}
return Ok(PolledMessages {
messages: vec![],
current_offset: polled_messages.current_offset,
partition_id,
count: 0,
});
}
return Ok(polled_messages);
}
let error = polled_messages.unwrap_err();
error!("Failed to poll messages: {error}");
if matches!(
error,
IggyError::Disconnected | IggyError::Unauthenticated | IggyError::StaleClient
) {
trace!("Retrying to poll messages in {retry_interval}...");
sleep(retry_interval.get_duration()).await;
}
Err(error)
}
}