fn poll_next()

in core/sdk/src/clients/consumer.rs [821:951]


    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let partition_id = self.current_partition_id.load(ORDERING);
        if let Some(message) = self.buffered_messages.pop_front() {
            {
                if let Some(last_consumed_offset_entry) =
                    self.last_consumed_offsets.get(&partition_id)
                {
                    last_consumed_offset_entry.store(message.header.offset, ORDERING);
                } else {
                    self.last_consumed_offsets
                        .insert(partition_id, AtomicU64::new(message.header.offset));
                }

                if (self.store_after_every_nth_message > 0
                    && message.header.offset % self.store_after_every_nth_message == 0)
                    || self.store_offset_after_each_message
                {
                    self.send_store_offset(partition_id, message.header.offset);
                }
            }

            if self.buffered_messages.is_empty() {
                if self.polling_strategy.kind == PollingKind::Offset {
                    self.polling_strategy = PollingStrategy::offset(message.header.offset + 1);
                }

                if self.store_offset_after_all_messages {
                    self.send_store_offset(partition_id, message.header.offset);
                }
            }

            let current_offset;
            if let Some(current_offset_entry) = self.current_offsets.get(&partition_id) {
                current_offset = current_offset_entry.load(ORDERING);
            } else {
                current_offset = 0;
            }

            return Poll::Ready(Some(Ok(ReceivedMessage::new(
                message,
                current_offset,
                partition_id,
            ))));
        }

        if self.poll_future.is_none() {
            let future = self.create_poll_messages_future();
            self.poll_future = Some(Box::pin(future));
        }

        while let Some(future) = self.poll_future.as_mut() {
            match future.poll_unpin(cx) {
                Poll::Ready(Ok(mut polled_messages)) => {
                    let partition_id = polled_messages.partition_id;
                    self.current_partition_id.store(partition_id, ORDERING);
                    if polled_messages.messages.is_empty() {
                        self.poll_future = Some(Box::pin(self.create_poll_messages_future()));
                    } else {
                        if let Some(ref encryptor) = self.encryptor {
                            for message in &mut polled_messages.messages {
                                let payload = encryptor.decrypt(&message.payload);
                                if payload.is_err() {
                                    self.poll_future = None;
                                    error!("Failed to decrypt the message payload at offset: {}, partition ID: {}", message.header.offset, partition_id);
                                    let error = payload.unwrap_err();
                                    return Poll::Ready(Some(Err(error)));
                                }

                                let payload = payload.unwrap();
                                message.payload = Bytes::from(payload);
                                message.header.payload_length = message.payload.len() as u32;
                            }
                        }

                        if let Some(current_offset_entry) = self.current_offsets.get(&partition_id)
                        {
                            current_offset_entry.store(polled_messages.current_offset, ORDERING);
                        } else {
                            self.current_offsets.insert(
                                partition_id,
                                AtomicU64::new(polled_messages.current_offset),
                            );
                        }

                        let message = polled_messages.messages.remove(0);
                        self.buffered_messages.extend(polled_messages.messages);

                        if self.polling_strategy.kind == PollingKind::Offset {
                            self.polling_strategy =
                                PollingStrategy::offset(message.header.offset + 1);
                        }

                        if let Some(last_consumed_offset_entry) =
                            self.last_consumed_offsets.get(&partition_id)
                        {
                            last_consumed_offset_entry.store(message.header.offset, ORDERING);
                        } else {
                            self.last_consumed_offsets
                                .insert(partition_id, AtomicU64::new(message.header.offset));
                        }

                        if (self.store_after_every_nth_message > 0
                            && message.header.offset % self.store_after_every_nth_message == 0)
                            || self.store_offset_after_each_message
                            || (self.store_offset_after_all_messages
                                && self.buffered_messages.is_empty())
                        {
                            self.send_store_offset(
                                polled_messages.partition_id,
                                message.header.offset,
                            );
                        }

                        self.poll_future = None;
                        return Poll::Ready(Some(Ok(ReceivedMessage::new(
                            message,
                            polled_messages.current_offset,
                            polled_messages.partition_id,
                        ))));
                    }
                }
                Poll::Ready(Err(err)) => {
                    self.poll_future = None;
                    return Poll::Ready(Some(Err(err)));
                }
                Poll::Pending => return Poll::Pending,
            }
        }

        Poll::Pending
    }