datahub/client/consumer/shard_reader.py (165 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import queue
import threading
import atomic
from enum import Enum
from datahub.exceptions import InvalidParameterException, DatahubException
from .message_key import MessageKey
from ..common.timer import Timer
from ..common.constant import Constant
from ..common.offset_meta import ConsumeOffset
class CompleteType(Enum):
T_NORMAL = "T_NORMAL",
T_EXCEPTION = "T_EXCEPTION",
T_DELAY = "T_DELAY"
class CompleteFetch:
def __init__(self, complete_type):
self._complete_type = complete_type
self._records = None
self._exception = None
self._timer = None
@property
def complete_type(self):
return self._complete_type
@property
def records(self):
if self._complete_type != CompleteType.T_NORMAL:
raise DatahubException("CompleteType error. type: {}".format(self._complete_type))
return self._records
@records.setter
def records(self, value):
if self._complete_type != CompleteType.T_NORMAL:
raise DatahubException("CompleteType error. type: {}".format(self._complete_type))
self._records = value
@property
def exception(self):
if self._complete_type != CompleteType.T_EXCEPTION:
raise DatahubException("CompleteType error. type: {}".format(self._complete_type))
return self._exception
@exception.setter
def exception(self, value):
if self._complete_type != CompleteType.T_EXCEPTION:
raise DatahubException("CompleteType error. type: {}".format(self._complete_type))
self._exception = value
@property
def timer(self):
if self._complete_type != CompleteType.T_DELAY:
raise DatahubException("CompleteType error. type: {}".format(self._complete_type))
return self._timer
@timer.setter
def timer(self, value):
if self._complete_type != CompleteType.T_DELAY:
raise DatahubException("CompleteType error. type: {}".format(self._complete_type))
self._timer = value
class ShardReader:
def __init__(self, project_name, topic_name, sub_id, message_reader, shard_id, offset, fetch_num):
self._closed = False
self._logger = logging.getLogger(ShardReader.__name__)
self._project_name = project_name
self._topic_name = topic_name
self._sub_id = sub_id
self._uniq_key = "{}:{}:{}".format(project_name, topic_name, sub_id)
self._message_reader = message_reader
self._shard_id = shard_id
self._read_offset = offset
self._fetch_num = fetch_num
self._has_read_count = atomic.AtomicLong(0)
self._read_lock = threading.Lock()
self._fetch_lock = threading.Condition()
self._cache_record_queue = queue.Queue()
self._remain_records = atomic.AtomicLong(0)
def close(self):
self._closed = True
self._logger.info("ShardReader closed. key: {}, shard_id: {}, read count: {}".format(self._uniq_key, self._shard_id, self._has_read_count.value))
def read(self, timeout):
if self._closed:
self._logger.warning("ShardReader closed when read. key: {}, shard_id: {}".format(self._uniq_key, self._shard_id))
raise DatahubException("ShardReader closed when read")
record = self.__read_next(timeout)
if record:
offset = ConsumeOffset(record.sequence, record.system_time, record.batch_index)
offset.next_cursor = self._read_offset.next_cursor
record.record_key = MessageKey(self._shard_id, offset)
self._has_read_count.get_and_set(self._has_read_count.value + 1)
return record
def reset_offset(self):
self._read_offset.reset_timestamp(-1)
@property
def shard_id(self):
return self._shard_id
def __read_next(self, timeout):
timer = Timer(max(timeout, Constant.MIN_TIMEOUT_WAIT_FETCH))
with self._read_lock:
record = None
while not self._closed and record is None and not timer.is_expired():
if self._remain_records.value > 0:
complete_fetch = self._cache_record_queue.get()
self._remain_records.get_and_set(self._remain_records.value-1)
if complete_fetch.complete_type == CompleteType.T_NORMAL:
return complete_fetch.records
elif complete_fetch.complete_type == CompleteType.T_EXCEPTION:
raise complete_fetch.exception
else:
try:
complete_fetch.timer.wait_expire()
except Exception as e:
raise e
else:
self._message_reader.send_task(self.__gen_next_fetch_task, self.__deal_with_task)
with self._fetch_lock:
try:
self._fetch_lock.wait_for(self.__not_empty, timer.deadline_time-Timer.get_curr_time())
except Exception as e:
raise e
return record
def __not_empty(self):
return not self._cache_record_queue.empty()
def __gen_next_fetch_task(self):
try:
cursor = self._message_reader.get_cursor(self._shard_id, self._read_offset)
if not cursor:
raise InvalidParameterException("Get cursor is None. shard_id: {}, offset: {}".format(self._shard_id, self._read_offset.to_string()))
record_result = self._message_reader.get_records(self._shard_id, cursor, self._fetch_num)
return record_result
except DatahubException as e:
self._logger.warning("Generate fetch task fail. key: {}. DatahubException: {}".format(self._uniq_key, e))
raise e
except Exception as e:
self._logger.warning("Generate fetch task fail. key: {}. Exception: {}".format(self._uniq_key, e))
raise e
def __deal_with_task(self, completed_task):
with self._fetch_lock:
if completed_task.exception():
self.__push_with_exception(completed_task.exception())
else:
record_result = completed_task.result()
if record_result.record_count > 0:
self.__push_with_records(record_result)
else:
self.__push_with_delay(record_result, Constant.DELAY_TIMEOUT_FOR_READ_END)
self._fetch_lock.notify_all()
def __push_with_exception(self, exception):
complete_fetch = CompleteFetch(CompleteType.T_EXCEPTION)
complete_fetch.exception = exception
self._cache_record_queue.put(complete_fetch)
self._remain_records.get_and_set(self._remain_records.value + 1)
self._read_offset.next_cursor = None
self._logger.warning("Push to cache queue with exception. shard_id: {}, key: {}, exception: {}"
.format(self._shard_id, self._uniq_key, exception))
def __push_with_records(self, record_result):
for tmp_record in record_result.records:
complete_fetch = CompleteFetch(CompleteType.T_NORMAL)
complete_fetch.records = tmp_record
self._cache_record_queue.put(complete_fetch)
self._remain_records.get_and_set(self._remain_records.value + record_result.record_count)
self._read_offset.next_cursor = record_result.next_cursor
self._logger.debug("Push to cache queue with records. shard_id: {}, key: {}, record count: {}"
.format(self._shard_id, self._uniq_key, record_result.record_count))
def __push_with_delay(self, record_result, timeout):
complete_fetch = CompleteFetch(CompleteType.T_DELAY)
complete_fetch.timer = Timer(timeout)
self._cache_record_queue.put(complete_fetch)
self._remain_records.get_and_set(self._remain_records.value + 1)
self._read_offset.next_cursor = record_result.next_cursor
self._logger.debug("Push to cache queue with delay. shard_id: {}, key: {}, delay timeout: {}"
.format(self._shard_id, self._uniq_key, timeout))