in datahub/client/producer/shard_writer.py [0:0]
def __init__(self, project_name, topic_name, sub_id, message_writer, producer_config, shard_id):
self._closed = False
self._logger = logging.getLogger(ShardWriter.__name__)
self._lock = threading.Lock()
self._project_name = project_name
self._topic_name = topic_name
self._sub_id = sub_id
self._uniq_key = "{}:{}:{}".format(project_name, topic_name, sub_id)
self._message_writer = message_writer
self._shard_id = shard_id
self._max_retry_times = producer_config.retry_times
self._task_num = atomic.AtomicLong(0)
self._condition = threading.Condition()
self._has_write_count = atomic.AtomicLong(0)
self._datahub_client = DatahubFactory.create_datahub_client(producer_config)
self._record_package_queue = RecordPackQueue(producer_config.max_async_buffer_size, producer_config.max_async_buffer_records,
producer_config.max_async_buffer_time, producer_config.max_record_pack_queue_limit)