in datahub/client/consumer/shard_reader.py [0:0]
def __init__(self, project_name, topic_name, sub_id, message_reader, shard_id, offset, fetch_num):
self._closed = False
self._logger = logging.getLogger(ShardReader.__name__)
self._project_name = project_name
self._topic_name = topic_name
self._sub_id = sub_id
self._uniq_key = "{}:{}:{}".format(project_name, topic_name, sub_id)
self._message_reader = message_reader
self._shard_id = shard_id
self._read_offset = offset
self._fetch_num = fetch_num
self._has_read_count = atomic.AtomicLong(0)
self._read_lock = threading.Lock()
self._fetch_lock = threading.Condition()
self._cache_record_queue = queue.Queue()
self._remain_records = atomic.AtomicLong(0)