tensorflow_io/python/experimental/kafka_batch_io_dataset_ops.py [108:146]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            assert internal

            if stream_timeout == -1:
                stream_timeout = sys.maxsize
            elif stream_timeout >= 0:
                # Taking the max of `stream_timeout` and `message_poll_timeout`
                # to prevent the user from bothering about the underlying polling
                # mechanism.
                stream_timeout = max(stream_timeout, message_poll_timeout)
            else:
                raise ValueError(
                    "Invalid stream_timeout value: {} ,set it to -1 to block indefinitely.".format(
                        stream_timeout
                    )
                )
            metadata = list(configuration or [])
            if group_id is not None:
                metadata.append("group.id=%s" % group_id)
            if servers is not None:
                metadata.append("bootstrap.servers=%s" % servers)
            resource = core_ops.io_kafka_group_readable_init(
                topics=topics, metadata=metadata
            )

            self._resource = resource
            dataset = tf.data.experimental.Counter()
            dataset = dataset.map(
                lambda i: core_ops.io_kafka_group_readable_next(
                    input=self._resource,
                    index=i,
                    message_poll_timeout=message_poll_timeout,
                    stream_timeout=stream_timeout,
                )
            )
            dataset = dataset.apply(
                tf.data.experimental.take_while(
                    lambda v: tf.greater(v.continue_fetch, 0)
                )
            )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py [152:190]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            assert internal

            if stream_timeout == -1:
                stream_timeout = sys.maxsize
            elif stream_timeout >= 0:
                # Taking the max of `stream_timeout` and `message_poll_timeout`
                # to prevent the user from bothering about the underlying polling
                # mechanism.
                stream_timeout = max(stream_timeout, message_poll_timeout)
            else:
                raise ValueError(
                    "Invalid stream_timeout value: {} ,set it to -1 to block indefinitely.".format(
                        stream_timeout
                    )
                )
            metadata = list(configuration or [])
            if group_id is not None:
                metadata.append("group.id=%s" % group_id)
            if servers is not None:
                metadata.append("bootstrap.servers=%s" % servers)
            resource = core_ops.io_kafka_group_readable_init(
                topics=topics, metadata=metadata
            )

            self._resource = resource
            dataset = tf.data.experimental.Counter()
            dataset = dataset.map(
                lambda i: core_ops.io_kafka_group_readable_next(
                    input=self._resource,
                    index=i,
                    message_poll_timeout=message_poll_timeout,
                    stream_timeout=stream_timeout,
                )
            )
            dataset = dataset.apply(
                tf.data.experimental.take_while(
                    lambda v: tf.greater(v.continue_fetch, 0)
                )
            )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



