datahub/client/consumer/shard_group_reader.py (136 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import threading
from datahub.exceptions import *
from .shard_reader import ShardReader
from .offset_select_strategy import OffsetSelectStrategy
from ..common.timer import Timer
from ..common.constant import Constant
class ShardGroupReader:
def __init__(self, coordinator, shard_ids, timestamp):
self._closed = False
self._logger = logging.getLogger(ShardGroupReader.__name__)
self._coordinator = coordinator
self._coordinator.assign_shard_list = shard_ids if shard_ids else []
self._shard_reader_map = dict()
self._select_strategy = OffsetSelectStrategy()
self._lock = threading.Lock()
self._coordinator.register_shard_change(self.on_shard_change)
self._coordinator.register_remove_all_shards(self.on_remove_all_shards)
self.__create_shard_reader(shard_ids, timestamp)
def close(self):
self._closed = True
with self._lock:
for reader in self._shard_reader_map.values():
reader.close()
self._shard_reader_map.clear()
self._logger.info("ShardGroupReader close success. key: {}".format(self._coordinator.uniq_key))
def on_shard_change(self, add_shards, del_shards):
self.__create_shard_reader(add_shards, -1)
self.__remover_shard_reader(del_shards)
def on_remove_all_shards(self):
self.__remove_all_shard_reader()
def read(self, shard_id, time_out):
if self._closed:
self._logger.warning("ShardGroupReader closed when read. key: {}".format(self._coordinator.uniq_key))
raise DatahubException("ShardGroupReader closed when read")
record = None
timer = Timer(time_out)
while not self._closed and record is None and not timer.is_expired():
if self._coordinator.waiting_shard_assign():
timer.wait_expire(Constant.DELAY_TIMEOUT_FOR_NOT_READY)
else:
self._coordinator.update_shard_info()
with self._lock:
reader = self.__get_next_reader(shard_id)
if reader is None:
timer.wait_expire(Constant.DELAY_TIMEOUT_FOR_NOT_READY)
else:
record = self.__read_by_reader(reader)
return record
def __read_by_reader(self, reader):
record = None
try:
record = reader.read(1)
self._select_strategy.after_read(reader.shard_id, record)
if record:
self._coordinator.send_record_offset(record.record_key)
if self._coordinator.auto_ack_offset:
record.record_key.ack()
except ShardSealedException as e: # error_code: 'InvalidShardOperation'
self._logger.warning("Read fail. Shard read end. shard_id: {}, key: {}, {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
self._coordinator.on_shard_read_end([reader.shard_id])
except InvalidCursorException as e: # error_code: 'InvalidCursor'
self._logger.warning("Read fail. Invalid cursor. shard_id: {}, key: {}, {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
reader.reset_offset()
except DatahubException as e:
self._logger.warning("Read fail. shard_id: {}, key: {}. DatahubException: {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
raise e
except Exception as e:
self._logger.warning("Read fail. shard_id: {}, key: {}. Exception: {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
raise e
return record
def __create_shard_reader(self, shard_ids, timestamp=-1):
with self._lock:
try:
if shard_ids is None or len(shard_ids) == 0:
return
shard_meta_map = self._coordinator.meta_data.shard_meta_map
shards_offset_map = self.__gen_shards_offset(shard_ids, timestamp)
for shard_id in shard_ids:
shard_meta = shard_meta_map.get(shard_id)
if shard_meta is None:
raise InvalidParameterException("Shard not found. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
if shard_id in self._shard_reader_map:
continue
consume_offset = shards_offset_map.get(shard_id)
reader = ShardReader(self._coordinator.project_name, self._coordinator.topic_name, self._coordinator.sub_id,
self._coordinator.meta_data.message_reader, shard_id, consume_offset, self._coordinator.fetch_limit)
self._shard_reader_map[shard_id] = reader
self._select_strategy.add_shard(shard_id)
self._logger.info("ShardReader created. key: {}, shard_id: {}, sequence: {}".format(self._coordinator.uniq_key, shard_id, consume_offset.sequence))
except DatahubException as e:
self._logger.warning("ShardReader create fail. key: {}, shard_ids: {}, DatahubException: {}".format(self._coordinator.uniq_key, shard_ids, e))
raise e
except Exception as e:
self._logger.warning("ShardReader create fail. key: {}, shard_ids: {}, {}".format(self._coordinator.uniq_key, shard_ids, e))
raise e
def __remover_shard_reader(self, shard_ids):
with self._lock:
if shard_ids is None or len(shard_ids) == 0:
return
for shard_id in shard_ids:
if shard_id in self._shard_reader_map:
self._shard_reader_map[shard_id].close()
self._shard_reader_map.pop(shard_id)
self._select_strategy.remove_shard(shard_id)
self._logger.info("ShardReader removed. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
def __remove_all_shard_reader(self):
with self._lock:
for shard_id in set(self._shard_reader_map.keys()):
self._shard_reader_map[shard_id].close()
self._shard_reader_map.pop(shard_id)
self._select_strategy.remove_shard(shard_id)
self._logger.info("ShardReader removed when remove all. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
def __gen_shards_offset(self, shard_ids, timestamp=-1):
offset_map = self._coordinator.init_and_get_offset(shard_ids)
if timestamp != -1:
for shard_id in offset_map:
offset_map[shard_id].reset_timestamp(timestamp)
return offset_map
def __get_next_reader(self, shard_id):
if shard_id:
reader = self._shard_reader_map.get(shard_id)
if not reader:
raise DatahubException("ShardReader not found. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
return reader
next_shard = self._select_strategy.get_next_shard()
if next_shard:
return self.__get_next_reader(next_shard)
if len(self._shard_reader_map) == 0:
self._logger.warning("No ShardReader found. May the consumer group in rebalance state. key: {}".format(self._coordinator.uniq_key))
return None
return next(iter(self._shard_reader_map.values()))