datahub/client/common/shard_coordinator.py (77 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datahub.exceptions import DatahubException
from .library_factory import LibraryFactory
class ShardCoordinator:
def __init__(self, project_name, topic_name, sub_id, common_config):
self._closed = False
self._logger = logging.getLogger(ShardCoordinator.__name__)
self._endpoint = common_config.endpoint
self._project_name = project_name
self._topic_name = topic_name
self._sub_id = sub_id
self._uniq_key = None
self._gen_uniq_key()
self._meta_data = LibraryFactory.get_meta_data(self, common_config)
self._assign_shard_list = []
self._shard_change = None
self._remove_all_shards = None
def close(self):
self._closed = True
LibraryFactory.remove_meta_data(self)
def update_shard_info(self):
if self._closed:
self._logger.warning("ShardCoordinator closed when update shard info. key: {}".format(self._uniq_key))
raise DatahubException("ShardCoordinator closed when update shard info")
self._meta_data.update_shard_meta()
def register_shard_change(self, shard_change_callback):
self._shard_change = shard_change_callback
def register_remove_all_shards(self, remove_all_shards_callback):
self._remove_all_shards = remove_all_shards_callback
def on_shard_meta_change(self, add_shards, del_shards):
if not self.is_user_shard_assign():
self._do_shard_change(add_shards, del_shards)
def is_user_shard_assign(self):
return len(self._assign_shard_list) > 0
@property
def assign_shard_list(self):
return self._assign_shard_list
@assign_shard_list.setter
def assign_shard_list(self, value):
self._assign_shard_list = value
@property
def endpoint(self):
return self._endpoint
@property
def project_name(self):
return self._project_name
@property
def topic_name(self):
return self._topic_name
@property
def sub_id(self):
return self._sub_id
@property
def meta_data(self):
return self._meta_data
@property
def uniq_key(self):
return self._uniq_key
def _do_shard_change(self, add_shards, del_shards):
if self._closed:
self._logger.warning("ShardCoordinator closed when shard change. key: {}".format(self._uniq_key))
raise DatahubException("ShardCoordinator closed when shard change")
if self._shard_change and ((add_shards and len(add_shards) != 0) or (del_shards and len(del_shards) != 0)):
self._shard_change(add_shards, del_shards)
def _do_remove_all_shards(self):
if self._closed:
self._logger.warning("ShardCoordinator closed when remove all shards. key: {}".format(self._uniq_key))
raise DatahubException("ShardCoordinator closed when remove all shards")
if self._remove_all_shards:
self._remove_all_shards()
def _gen_uniq_key(self, suffix=None):
if not self._uniq_key:
self._uniq_key = "{}:{}".format(self._project_name, self._topic_name)
if self._sub_id:
self._uniq_key += (":" + self._sub_id)
if suffix:
self._uniq_key += (":" + suffix)