datahub/client/common/meta_data.py (130 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import atomic
from datahub.models import RecordType
from datahub.exceptions import DatahubException
from .timer import Timer
from .constant import Constant
from .datahub_factory import DatahubFactory
from ..consumer.message_reader import MessageReader
from ..producer.message_writer import MessageWriter
class MetaData:
def __init__(self, key, project_name, topic_name, sub_id, common_config):
self._class_key = key
self._logger = logging.getLogger(MetaData.__name__)
self._endpoint = common_config.endpoint
self._datahub_client = DatahubFactory.create_datahub_client(common_config)
# coordinators set
self._coordinators = set()
# Update topic
self._topic_meta = self.__init_topic_meta(project_name, topic_name)
# Update shard
self._updating = atomic.AtomicLong(0)
self._shard_meta_map = dict()
self._timer = Timer(Constant.SHARD_META_REFRESH_TIMEOUT)
self.__update_shard_meta_once()
# pub / sub
thread_num = max(min(common_config.async_thread_limit, Constant.MAX_ASYNC_THREAD_LIMIT), Constant.MIN_ASYNC_THREAD_LIMIT)
queue_limit = common_config.thread_queue_limit
if sub_id:
self._message_reader, self._message_writer = MessageReader(self, queue_limit, thread_num), None
else:
self._message_reader, self._message_writer = None, MessageWriter(self, queue_limit, thread_num)
def close(self):
if self._message_writer:
self._message_writer.close()
if self._message_reader:
self._message_reader.close()
def update_shard_meta(self):
if self._timer.is_expired():
try:
self.__update_shard_meta_once()
except DatahubException as e:
self._logger.warning("ShardCoordinator update shard meta fail. key: {}. Exception: {}".format(self._class_key, e))
def register(self, coordinator):
self._coordinators.add(coordinator)
def unregister(self, coordinator):
self._coordinators.remove(coordinator)
return len(self._coordinators)
def __init_topic_meta(self, project_name, topic_name):
try:
get_topic_result = self._datahub_client.get_topic(project_name, topic_name)
return TopicMeta(project_name, topic_name, get_topic_result.record_type, get_topic_result.record_schema)
except DatahubException as e:
self._logger.warning("Init topic meta fail. key: {}, DatahubException: {}".format(self._class_key, e))
raise e
except Exception as e:
self._logger.warning("Init topic meta fail. key: {}, {}".format(self._class_key, e))
raise e
def __update_shard_meta_once(self):
if self._updating.compare_and_set(0, 1):
new_shard_map = dict()
try:
list_shard_result = self._datahub_client.list_shard(self._topic_meta.project_name, self._topic_meta.topic_name)
for shard in list_shard_result.shards:
new_shard_map[shard.shard_id] = ShardMeta(shard.shard_id, self._endpoint, shard.state, list_shard_result.protocol)
except DatahubException as e:
self._logger.warning("Update shard meta fail. key: {}, DatahubException: {}".format(self._class_key, e))
raise e
except Exception as e:
self._logger.warning("Update shard meta fail. key: {}, {}".format(self._class_key, e))
raise e
new_add = [k for k in new_shard_map if k not in self._shard_meta_map]
new_del = [k for k in self._shard_meta_map if k not in new_shard_map]
if len(new_add) > 0 or len(new_del) > 0:
self._logger.debug("Shard changed when update shard meta. key: {}, new_add: {}, new_del: {}".format(self._class_key, new_add, new_del))
self._shard_meta_map = new_shard_map
for coordinator in self._coordinators:
coordinator.on_shard_meta_change(new_add, new_del)
self._timer.reset()
self._logger.debug("Update shard meta success. key: {}".format(self._class_key))
self._updating.compare_and_set(1, 0)
@property
def class_key(self):
return self._class_key
@property
def shard_meta_map(self):
return self._shard_meta_map
@property
def topic_meta(self):
return self._topic_meta
@property
def datahub_client(self):
return self._datahub_client
@property
def message_reader(self):
return self._message_reader
@property
def message_writer(self):
return self._message_writer
class TopicMeta:
def __init__(self, project_name, topic_name, record_type, record_schema):
self._project_name = project_name
self._topic_name = topic_name
self._record_type = RecordType(record_type)
self._record_schema = record_schema
@property
def project_name(self):
return self._project_name
@property
def topic_name(self):
return self._topic_name
@property
def record_type(self):
return self._record_type
@property
def record_schema(self):
return self._record_schema
class ShardMeta:
def __init__(self, shard_id, address, shard_state, protocol):
self._shard_id = shard_id
self._address = address
self._shard_state = shard_state
self._protocol = protocol
@property
def shard_id(self):
return self._shard_id
@property
def address(self):
return self._address
@property
def shard_state(self):
return self._shard_state
@property
def protocol(self):
return self._protocol