in core/sdk/src/clients/consumer.rs [821:951]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let partition_id = self.current_partition_id.load(ORDERING);
if let Some(message) = self.buffered_messages.pop_front() {
{
if let Some(last_consumed_offset_entry) =
self.last_consumed_offsets.get(&partition_id)
{
last_consumed_offset_entry.store(message.header.offset, ORDERING);
} else {
self.last_consumed_offsets
.insert(partition_id, AtomicU64::new(message.header.offset));
}
if (self.store_after_every_nth_message > 0
&& message.header.offset % self.store_after_every_nth_message == 0)
|| self.store_offset_after_each_message
{
self.send_store_offset(partition_id, message.header.offset);
}
}
if self.buffered_messages.is_empty() {
if self.polling_strategy.kind == PollingKind::Offset {
self.polling_strategy = PollingStrategy::offset(message.header.offset + 1);
}
if self.store_offset_after_all_messages {
self.send_store_offset(partition_id, message.header.offset);
}
}
let current_offset;
if let Some(current_offset_entry) = self.current_offsets.get(&partition_id) {
current_offset = current_offset_entry.load(ORDERING);
} else {
current_offset = 0;
}
return Poll::Ready(Some(Ok(ReceivedMessage::new(
message,
current_offset,
partition_id,
))));
}
if self.poll_future.is_none() {
let future = self.create_poll_messages_future();
self.poll_future = Some(Box::pin(future));
}
while let Some(future) = self.poll_future.as_mut() {
match future.poll_unpin(cx) {
Poll::Ready(Ok(mut polled_messages)) => {
let partition_id = polled_messages.partition_id;
self.current_partition_id.store(partition_id, ORDERING);
if polled_messages.messages.is_empty() {
self.poll_future = Some(Box::pin(self.create_poll_messages_future()));
} else {
if let Some(ref encryptor) = self.encryptor {
for message in &mut polled_messages.messages {
let payload = encryptor.decrypt(&message.payload);
if payload.is_err() {
self.poll_future = None;
error!("Failed to decrypt the message payload at offset: {}, partition ID: {}", message.header.offset, partition_id);
let error = payload.unwrap_err();
return Poll::Ready(Some(Err(error)));
}
let payload = payload.unwrap();
message.payload = Bytes::from(payload);
message.header.payload_length = message.payload.len() as u32;
}
}
if let Some(current_offset_entry) = self.current_offsets.get(&partition_id)
{
current_offset_entry.store(polled_messages.current_offset, ORDERING);
} else {
self.current_offsets.insert(
partition_id,
AtomicU64::new(polled_messages.current_offset),
);
}
let message = polled_messages.messages.remove(0);
self.buffered_messages.extend(polled_messages.messages);
if self.polling_strategy.kind == PollingKind::Offset {
self.polling_strategy =
PollingStrategy::offset(message.header.offset + 1);
}
if let Some(last_consumed_offset_entry) =
self.last_consumed_offsets.get(&partition_id)
{
last_consumed_offset_entry.store(message.header.offset, ORDERING);
} else {
self.last_consumed_offsets
.insert(partition_id, AtomicU64::new(message.header.offset));
}
if (self.store_after_every_nth_message > 0
&& message.header.offset % self.store_after_every_nth_message == 0)
|| self.store_offset_after_each_message
|| (self.store_offset_after_all_messages
&& self.buffered_messages.is_empty())
{
self.send_store_offset(
polled_messages.partition_id,
message.header.offset,
);
}
self.poll_future = None;
return Poll::Ready(Some(Ok(ReceivedMessage::new(
message,
polled_messages.current_offset,
polled_messages.partition_id,
))));
}
}
Poll::Ready(Err(err)) => {
self.poll_future = None;
return Poll::Ready(Some(Err(err)));
}
Poll::Pending => return Poll::Pending,
}
}
Poll::Pending
}