datahub/client/consumer/offset_manager.py (173 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
import logging
import threading
from collections import deque
from datahub.models import OffsetWithBatchIndex
from datahub.exceptions import SubscriptionOfflineException, ResourceNotFoundException, \
OffsetResetException, InvalidOperationException, DatahubException
from ..common.timer import Timer
from ..common.constant import Constant
class OffsetManager:
def __init__(self, coordinator):
self._closed = False
self._logger = logging.getLogger(OffsetManager.__name__)
self._coordinator = coordinator
self._uniq_key = self._coordinator.uniq_key
self._timer = Timer(Constant.OFFSET_COMMIT_INTERVAL_TIMEOUT)
self._lock = threading.Lock()
self._offset_meta_map = dict()
self._offset_request_queue_map = dict()
self._last_offset_map = dict()
self.__start()
def close(self):
self._closed = True
self._timer.notify_all()
self._commit_task.join()
def set_offset_meta(self, consume_offset_map):
with self._lock:
for shard_id, consume_offset in consume_offset_map.items():
self._offset_meta_map[shard_id] = consume_offset
self._offset_request_queue_map[shard_id] = deque()
def on_shard_release(self, del_shards):
self.__force_commit_offset(del_shards)
with self._lock:
for shard_id in del_shards:
if shard_id in self._offset_meta_map:
self._offset_meta_map.pop(shard_id)
if shard_id in self._offset_request_queue_map:
self._offset_request_queue_map.pop(shard_id)
def on_offset_reset(self):
with self._lock:
self._last_offset_map.clear()
self._offset_request_queue_map.clear()
self._offset_meta_map.clear()
def send_record_offset(self, message_key):
if message_key.shard_id not in self._offset_request_queue_map:
self._logger.warning("Send record offset error. shard_id: {}, key: {}".format(message_key.shard_id, self._uniq_key))
raise DatahubException("Send record offset error")
with self._lock:
queue = self._offset_request_queue_map.get(message_key.shard_id)
if queue is None:
raise DatahubException("Offset request deque not found. key: {}, shar_id: {}".format(self._uniq_key, message_key.shard_id))
queue.append(OffsetRequest(message_key))
self._logger.debug("Send record offset success. shard_id: {}, key: {}, offset: {}"
.format(message_key.shard_id, self._uniq_key, message_key.offset.to_string()))
def __start(self):
self._commit_task = threading.Thread(target=self.__commit_offset_task)
self._commit_task.setName("OffsetManager")
self._commit_task.start()
def __commit_offset_task(self):
self._logger.info("Offset commit task start. key: {}".format(self._uniq_key))
while not self._closed:
if self._timer.is_expired():
try:
with self._lock:
self.__sync_offsets()
self.__commit_offsets()
self._timer.reset()
except OffsetResetException as e:
self._logger.warning("CommitOffset fail, subscription offset reset. key:{}. last offset map: {}. {}".format(
self._uniq_key, self._last_offset_map, e))
except InvalidOperationException as e:
self._logger.warning("CommitOffset fail, subscription session invalid. key:{}. {}".format(self._uniq_key, e))
self._coordinator.on_sub_session_changed()
except SubscriptionOfflineException as e:
self._logger.warning("CommitOffset fail, subscription offline. key:{}. {}".format(self._uniq_key, e))
self._coordinator.on_sub_offline()
except ResourceNotFoundException as e:
if "NoSuchSubscription" in e.error_code:
self._logger.warning("CommitOffset fail, subscription deleted. key:{}. {}".format(self._uniq_key, e))
self._coordinator.on_sub_deleted()
else:
self._logger.warning("CommitOffset fail. key:{}. NoSuchSubscription: {}".format(self._uniq_key, e))
except Exception as e:
self._logger.warning("CommitOffset fail. key:{}. {}".format(self._uniq_key, e))
raise e
else:
try:
self._timer.wait_expire(Constant.OFFSET_CHECK_TIMEOUT)
except Exception as e:
self._logger.warning("OffsetCommitTask interrupt occur. key: {}, {}".format(self._uniq_key, e))
break
with self._lock:
self.__sync_offsets()
self.__commit_offsets()
self._logger.info("Offset commit task stop. key: {}".format(self._uniq_key))
def __force_commit_offset(self, shard_ids):
try:
timer = Timer(Constant.FORCE_COMMIT_TIMEOUT)
self.__commit_right_now()
while not timer.is_expired() and not self.is_request_queue_empty(shard_ids):
self.__commit_right_now()
except Exception as e:
self._logger.warning("Force commit offset fail. key:{}, shard_ids: {}, {}".format(self._uniq_key, shard_ids, e))
def __commit_right_now(self):
self._timer.reset_deadline()
self._timer.notify_all()
def is_request_queue_empty(self, shard_ids):
with self._lock:
for shard_id in shard_ids:
requests = self._offset_request_queue_map.get(shard_id)
if requests and len(requests) > 0:
return False
return True
def __sync_offsets(self):
for shard_id, request_queue in self._offset_request_queue_map.items():
request = None
while len(request_queue) > 0 and request_queue[0].is_ready():
request = request_queue[0]
request_queue.popleft()
if request:
meta = self._offset_meta_map.get(shard_id)
if not meta:
self._logger.warning("OffsetMeta not found. key:{}, shard_id:{}".format(self._uniq_key, shard_id))
raise DatahubException("OffsetMeta not found")
consume_offset = request.message_key.offset
self._last_offset_map[shard_id] = OffsetWithBatchIndex(
consume_offset.sequence,
consume_offset.timestamp,
meta.version_id,
meta.session_id,
consume_offset.batch_index
)
self._logger.debug("Sync offset once success. key: {}, shard_id: {}".format(self._uniq_key, shard_id))
else:
if len(request_queue) > 0: # 最先入队列的Request依然没有Ready
curr_timeout = int(time.time())
diff = curr_timeout - request_queue[0].timestamp
if diff > Constant.NOT_ACK_WARNING_TIMEOUT:
self._logger.warning("Record not ack for {} s. key:{}, shard_id:{}, currTs:{}, offset:{}"
.format(diff, self._uniq_key, shard_id, curr_timeout, request_queue[0].message_key.to_string()))
if diff > Constant.NOT_ACK_WARNING_TIMEOUT * 10:
self._coordinator.on_offset_not_ack()
def __commit_offsets(self):
try:
if len(self._last_offset_map) > 0:
self._coordinator.meta_data.datahub_client.update_subscription_offset(
self._coordinator.project_name,
self._coordinator.topic_name,
self._coordinator.sub_id,
self._last_offset_map
)
self._logger.info("Commit offset success. key: {}, min offset = {}".format(self._uniq_key, self.__get_min_timestamp()))
self._last_offset_map.clear()
except DatahubException as e:
self._logger.warning("Commit offset fail. key: {}, min offset = {}, DatahubException: {}".format(self._uniq_key, self.__get_min_timestamp(), e))
raise e
except Exception as e:
self._logger.warning("Commit offset fail. key: {}, min offset = {}, {}".format(self._uniq_key, self.__get_min_timestamp(), e))
raise e
def __get_min_timestamp(self):
return min(self._last_offset_map.values(), key=lambda offset: offset.timestamp)
class OffsetRequest:
def __init__(self, message_key):
self._timestamp = int(time.time())
self._message_key = message_key
def is_ready(self):
return self._message_key.is_ready()
@property
def message_key(self):
return self._message_key
@property
def timestamp(self):
return self._timestamp