datahub/implement.py (672 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
import urllib3
from .batch.schema_registry_client import SchemaRegistryClient
from .models.params import *
from .models.results import *
from .auth import AliyunAccount
from .exceptions import InvalidParameterException, InvalidOperationException
from .models import ShardState, OffsetBase, SubscriptionState, FieldType, OffsetWithSession
from .rest import Path
from .rest import RestClient
from .utils import check_project_name_valid, check_topic_name_valid, check_type, check_positive, \
to_text, ErrorMessage, check_empty, check_negative
urllib3.disable_warnings()
class DataHubJson(object):
"""
Datahub json client
"""
def __init__(self, access_id, access_key, endpoint=None, compress_format=None, enable_schema_register=False, **kwargs):
self._account = kwargs.pop('account', None)
if self._account is None:
security_token = kwargs.pop('security_token', '')
self._account = AliyunAccount(access_id=access_id, access_key=access_key, security_token=security_token)
self._endpoint = endpoint
self._compress_format = compress_format
self._rest_client = RestClient(self._account, self._endpoint, **kwargs)
def list_project(self):
url = Path.PROJECTS
content = self._rest_client.get(url)
result = ListProjectResult.parse_content(content)
return result
def create_project(self, project_name, comment):
if not check_project_name_valid(project_name):
raise InvalidParameterException(ErrorMessage.INVALID_PROJECT_NAME)
url = Path.PROJECT % project_name
request_param = CreateProjectRequestParams(comment)
self._rest_client.post(url, data=request_param.content())
def get_project(self, project_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
url = Path.PROJECT % project_name
content = self._rest_client.get(url)
result = GetProjectResult.parse_content(to_text(content), project_name=project_name)
return result
def update_project(self, project_name, comment):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
url = Path.PROJECT % project_name
request_param = UpdateProjectRequestParams(comment)
self._rest_client.put(url, data=request_param.content())
def delete_project(self, project_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
url = Path.PROJECT % project_name
self._rest_client.delete(url)
def list_topic(self, project_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
url = Path.TOPICS % project_name
content = self._rest_client.get(url)
result = ListTopicResult.parse_content(content)
return result
def create_blob_topic(self, project_name, topic_name, shard_count, life_cycle, extend_mode, comment):
self.__create_topic(project_name, topic_name, shard_count, life_cycle, RecordType.BLOB, comment, extend_mode)
def create_tuple_topic(self, project_name, topic_name, shard_count, life_cycle, record_schema, extend_mode,
comment):
self.__create_topic(project_name, topic_name, shard_count, life_cycle, RecordType.TUPLE, comment, extend_mode,
record_schema)
def get_topic(self, project_name, topic_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.TOPIC % (project_name, topic_name)
content = self._rest_client.get(url)
result = GetTopicResult.parse_content(to_text(content), project_name=project_name,
topic_name=topic_name)
return result
def update_topic(self, project_name, topic_name, life_cycle, comment):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if not check_positive(life_cycle):
raise InvalidParameterException(ErrorMessage.PARAMETER_NOT_POSITIVE % 'life_cycle')
url = Path.TOPIC % (project_name, topic_name)
request_param = UpdateTopicRequestParams(life_cycle, comment)
self._rest_client.put(url, data=request_param.content())
def delete_topic(self, project_name, topic_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.TOPIC % (project_name, topic_name)
self._rest_client.delete(url)
def append_field(self, project_name, topic_name, field_name, field_type):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(field_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'field_name')
if not check_type(field_type, FieldType):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE %
('field_type', FieldType.__name__))
url = Path.TOPIC % (project_name, topic_name)
request_param = AppendFieldParams(field_name, field_type)
self._rest_client.post(url, data=request_param.content())
def wait_shards_ready(self, project_name, topic_name, timeout=30):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_negative(timeout):
raise InvalidParameterException(ErrorMessage.PARAMETER_NEGATIVE % 'timeout')
current_time = time.time()
end_time = current_time + timeout
while current_time <= end_time:
if self.__is_shard_load_completed(project_name, topic_name):
return
time.sleep(1)
current_time = time.time()
raise DatahubException(ErrorMessage.WAIT_SHARD_TIMEOUT)
def list_shard(self, project_name, topic_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.SHARDS % (project_name, topic_name)
content = self._rest_client.get(url)
result = ListShardResult.parse_content(content)
return result
def merge_shard(self, project_name, topic_name, shard_id, adj_shard_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if check_empty(adj_shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'adj_shard_id')
url = Path.SHARDS % (project_name, topic_name)
request_param = MergeShardRequestParams(shard_id, adj_shard_id)
content = self._rest_client.post(url, data=request_param.content())
result = MergeShardResult.parse_content(content)
return result
def split_shard(self, project_name, topic_name, shard_id, split_key=''):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if check_empty(split_key):
shard_info = self.__get_shard(project_name, topic_name, shard_id)
if not check_empty(shard_info):
begin_hash_key = int(shard_info.begin_hash_key, 16)
end_hash_key = int(shard_info.end_hash_key, 16)
split_key = hex(int((begin_hash_key + end_hash_key) // 2))[2:34]
url = Path.SHARDS % (project_name, topic_name)
request_param = SplitShardRequestParams(shard_id, split_key)
content = self._rest_client.post(url, data=request_param.content())
result = SplitShardResult.parse_content(content)
return result
def extend_shard(self, project_name, topic_name, shard_count):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if not check_positive(shard_count):
raise InvalidParameterException(ErrorMessage.PARAMETER_NOT_POSITIVE % 'shard_count')
url = Path.SHARDS % (project_name, topic_name)
request_param = ExtendShardRequestParams(shard_count)
self._rest_client.post(url, data=request_param.content())
def get_cursor(self, project_name, topic_name, shard_id, cursor_type, param=-1):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if not check_type(cursor_type, CursorType):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('cursor_type', CursorType.__name__))
if CursorType.SYSTEM_TIME == cursor_type and param < 0:
raise InvalidParameterException(ErrorMessage.MISSING_SYSTEM_TIME)
elif CursorType.SEQUENCE == cursor_type and param < 0:
raise InvalidParameterException(ErrorMessage.MISSING_SEQUENCE)
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = GetCursorRequestParams(cursor_type, param)
content = self._rest_client.post(url, data=request_param.content())
result = GetCursorResult.parse_content(content)
return result
def put_records(self, project_name, topic_name, record_list):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.SHARDS % (project_name, topic_name)
request_param = PutRecordsRequestParams(record_list)
content = self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers(),
compress_format=self._compress_format)
result = PutRecordsResult.parse_content(content)
return result
def put_records_by_shard(self, project_name, topic_name, shard_id, record_list):
raise DatahubException('put_records_by_shard api only support pb mode')
def get_blob_records(self, project_name, topic_name, sub_id, shard_id, cursor, limit_num):
return self.__get_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num)
def get_tuple_records(self, project_name, topic_name, sub_id, shard_id, record_schema, cursor, limit_num):
return self.__get_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num, record_schema)
def get_metering_info(self, project_name, topic_name, shard_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = GetMeteringInfoRequestParams()
content = self._rest_client.post(url, data=request_param.content())
result = GetMeteringInfoResult.parse_content(content)
return result
def list_connector(self, project_name, topic_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTORS % (project_name, topic_name) + "?mode=id"
content = self._rest_client.get(url)
result = ListConnectorResult.parse_content(content)
return result
def create_connector(self, project_name, topic_name, connector_type, column_fields, config, start_time):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_type.value)
request_param = CreateConnectorParams(column_fields, config, start_time)
content = self._rest_client.post(url, data=request_param.content())
result = CreateConnectorResult.parse_content(content)
return result
def update_connector(self, project_name, topic_name, connector_id, config):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
request_param = UpdateConnectorParams(config)
self._rest_client.post(url, data=request_param.content())
def get_connector(self, project_name, topic_name, connector_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
content = self._rest_client.get(url)
result = GetConnectorResult.parse_content(content)
return result
def delete_connector(self, project_name, topic_name, connector_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
self._rest_client.delete(url)
def get_connector_shard_status(self, project_name, topic_name, connector_id, shard_id=''):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
request_param = GetConnectorShardStatusParams(shard_id)
content = self._rest_client.post(url, data=request_param.content())
result = GetConnectorShardStatusResult.parse_content(content)
return result
def reload_connector(self, project_name, topic_name, connector_id, shard_id=''):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
request_param = ReloadConnectorParams(shard_id)
self._rest_client.post(url, data=request_param.content())
def append_connector_field(self, project_name, topic_name, connector_id, field_name):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(field_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'field_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
request_param = AppendConnectorFieldParams(field_name)
self._rest_client.post(url, data=request_param.content())
def get_connector_done_time(self, project_name, topic_name, connector_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.DONE_TIME % (project_name, topic_name, connector_id)
content = self._rest_client.get(url)
result = GetConnectorDoneTimeResult.parse_content(content)
return result
def update_connector_state(self, project_name, topic_name, connector_id, connector_state):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
request_param = UpdateConnectorStateParams(connector_state)
self._rest_client.post(url, data=request_param.content())
def update_connector_offset(self, project_name, topic_name, connector_id, shard_id, connector_offset):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.CONNECTOR % (project_name, topic_name, connector_id)
request_param = UpdateConnectorOffsetParams(shard_id, connector_offset)
self._rest_client.post(url, data=request_param.content())
def init_and_get_subscription_offset(self, project_name, topic_name, sub_id, shard_ids):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
if check_empty(shard_ids):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_ids')
if isinstance(shard_ids, six.string_types):
shard_ids = [shard_ids]
if not check_type(shard_ids, list):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('shard_ids', list.__name__))
url = Path.OFFSETS % (project_name, topic_name, sub_id)
request_param = InitAndGetSubscriptionOffsetParams(shard_ids)
content = self._rest_client.post(url, data=request_param.content())
result = InitAndGetSubscriptionOffsetResult.parse_content(content)
return result
def get_subscription_offset(self, project_name, topic_name, sub_id, shard_ids=None):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
if isinstance(shard_ids, six.string_types):
shard_ids = [shard_ids]
url = Path.OFFSETS % (project_name, topic_name, sub_id)
request_param = GetSubscriptionOffsetParams(shard_ids)
content = self._rest_client.post(url, data=request_param.content())
result = GetSubscriptionOffsetResult.parse_content(content)
return result
def update_subscription_offset(self, project_name, topic_name, sub_id, offsets):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
if not check_type(offsets, dict):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('offsets', dict.__name__))
for (k, v) in offsets.items():
if isinstance(v, dict):
offsets[k] = OffsetWithSession.from_dict(v)
url = Path.OFFSETS % (project_name, topic_name, sub_id)
request_param = UpdateSubscriptionOffsetParams(offsets)
self._rest_client.put(url, data=request_param.content())
def join_group(self, project_name, topic_name, consumer_group, session_timeout):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if consumer_group is None or len(consumer_group) == 0:
raise InvalidParameterException("Consumer group format is invalid")
url = Path.SUBSCRIPTION % (project_name, topic_name, consumer_group)
request_param = JoinGroupParams(session_timeout)
content = self._rest_client.post(url, data=request_param.content())
result = JoinGroupResult.parse_content(content)
return result
def heart_beat(self, project_name, topic_name, consumer_group, consumer_id, version_id, hold_shard_list, read_end_shard_list):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if consumer_group is None or len(consumer_group) == 0:
raise InvalidParameterException("Consumer group format is invalid")
if hold_shard_list is None:
raise InvalidParameterException("Hold shard list is none")
url = Path.SUBSCRIPTION % (project_name, topic_name, consumer_group)
request_param = HeartBeatParams(consumer_id, version_id, hold_shard_list, read_end_shard_list)
content = self._rest_client.post(url, data=request_param.content())
result = HeartBeatResult.parse_content(content)
return result
def sync_group(self, project_name, topic_name, consumer_group, consumer_id, version_id, release_shard_list, read_end_shard_list):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if consumer_group is None or len(consumer_group) == 0:
raise InvalidParameterException("Consumer group format is invalid")
url = Path.SUBSCRIPTION % (project_name, topic_name, consumer_group)
request_param = SyncGroupParams(consumer_id, version_id, release_shard_list, read_end_shard_list)
content = self._rest_client.post(url, data=request_param.content())
result = SyncGroupResult.parse_content(content)
return result
def leave_group(self, project_name, topic_name, consumer_group, consumer_id, version_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if consumer_group is None or len(consumer_group) == 0:
raise InvalidParameterException("Consumer group format is invalid")
url = Path.SUBSCRIPTION % (project_name, topic_name, consumer_group)
request_param = LeaveGroupParams(consumer_id, version_id)
content = self._rest_client.post(url, data=request_param.content())
result = LeaveGroupResult.parse_content(content)
return result
def list_topic_schema(self, project_name, topic_name, page_number=-1, page_size=-1):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.TOPIC % (project_name, topic_name)
request_param = ListTopicSchemaParams(page_number, page_size)
content = self._rest_client.post(url, data=request_param.content())
result = ListTopicSchemaResult.parse_content(content)
return result
def get_topic_schema(self, project_name, topic_name, schema=None, version_id=-1):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if schema is None and version_id == -1:
raise InvalidParameterException("Error. Schema and version_id are both empty.")
url = Path.TOPIC % (project_name, topic_name)
request_param = GetTopicSchemaParams(version_id, schema)
content = self._rest_client.post(url, data=request_param.content())
result = GetTopicSchemaResult.parse_content(content)
return result
def register_topic_schema(self, project_name, topic_name, schema):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.TOPIC % (project_name, topic_name)
request_param = RegisterTopicSchemaParams(schema)
content = self._rest_client.post(url, data=request_param.content())
result = RegisterTopicSchemaResult.parse_content(content)
return result
def delete_topic_schema(self, project_name, topic_name, version_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.TOPIC % (project_name, topic_name)
request_param = DeleteTopicSchemaParams(version_id)
content = self._rest_client.post(url, data=request_param.content())
result = DeleteTopicSchemaResult.parse_content(content)
return result
# =======================================================
# internal api
# =======================================================
def create_subscription(self, project_name, topic_name, comment):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
url = Path.SUBSCRIPTIONS % (project_name, topic_name)
request_param = CreateSubscriptionParams(comment)
content = self._rest_client.post(url, data=request_param.content())
result = CreateSubscriptionResult.parse_content(content)
return result
def delete_subscription(self, project_name, topic_name, sub_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
url = Path.SUBSCRIPTION % (project_name, topic_name, sub_id)
self._rest_client.delete(url)
def get_subscription(self, project_name, topic_name, sub_id):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
url = Path.SUBSCRIPTION % (project_name, topic_name, sub_id)
content = self._rest_client.get(url)
result = GetSubscriptionResult.parse_content(content)
return result
def update_subscription(self, project_name, topic_name, sub_id, comment):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
url = Path.SUBSCRIPTION % (project_name, topic_name, sub_id)
request_param = UpdateSubscriptionParams(comment)
self._rest_client.put(url, data=request_param.content())
def update_subscription_state(self, project_name, topic_name, sub_id, state):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
if not check_type(state, SubscriptionState):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE %
('state', SubscriptionState.__name__))
url = Path.SUBSCRIPTION % (project_name, topic_name, sub_id)
request_param = UpdateSubscriptionStateParams(state)
self._rest_client.put(url, data=request_param.content())
def list_subscription(self, project_name, topic_name, query_key, page_index, page_size):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if not check_positive(page_index):
raise InvalidParameterException(ErrorMessage.PARAMETER_NOT_POSITIVE % 'page_index')
if check_negative(page_size):
raise InvalidParameterException(ErrorMessage.PARAMETER_NEGATIVE % 'page_size')
url = Path.SUBSCRIPTIONS % (project_name, topic_name)
request_param = ListSubscriptionParams(query_key, page_index, page_size)
content = self._rest_client.post(url, data=request_param.content())
result = ListSubscriptionResult.parse_content(content)
return result
def reset_subscription_offset(self, project_name, topic_name, sub_id, offsets):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(sub_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'sub_id')
if check_empty(offsets):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'offsets')
if not check_type(offsets, dict):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('offsets', dict.__name__))
for (k, v) in offsets.items():
if isinstance(v, dict):
offsets[k] = OffsetBase.from_dict(v)
url = Path.OFFSETS % (project_name, topic_name, sub_id)
request_param = ResetSubscriptionOffsetParams(offsets)
self._rest_client.put(url, data=request_param.content())
# =======================================================
# private function
# =======================================================
def __is_shard_load_completed(self, project_name, topic_name):
shards = self.list_shard(project_name, topic_name)
for shard in shards.shards:
if shard.state not in (ShardState.ACTIVE, ShardState.CLOSED):
return False
return True
def __get_shard(self, project_name, topic_name, shard_id):
shards = self.list_shard(project_name, topic_name)
for shard in shards.shards:
if shard.shard_id == shard_id:
return shard
def __create_topic(self, project_name, topic_name, shard_count, life_cycle, record_type, comment,
extend_mode=None, record_schema=None):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if not check_topic_name_valid(topic_name):
raise InvalidParameterException(ErrorMessage.INVALID_TOPIC_NAME)
if not check_positive(life_cycle):
raise InvalidParameterException(ErrorMessage.PARAMETER_NOT_POSITIVE % 'life_cycle')
if not check_type(record_type, RecordType):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('record_type', RecordType.__name__))
if record_type == RecordType.TUPLE:
if not check_type(record_schema, RecordSchema):
raise InvalidParameterException(ErrorMessage.INVALID_RECORD_SCHEMA_TYPE)
if extend_mode is not None and not isinstance(extend_mode, bool):
raise InvalidParameterException('type of extend mode should be \'bool\'')
url = Path.TOPIC % (project_name, topic_name)
request_param = CreateTopicRequestParams(shard_count, life_cycle, record_type, record_schema, extend_mode, comment)
self._rest_client.post(url, data=request_param.content())
def __get_records(self, project_name, topic_name, sub_id, shard_id, cursor, limit_num, record_schema=None):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if check_empty(cursor):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'cursor')
if check_type(cursor, GetCursorResult):
raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('cursor', 'str'))
if sub_id is not None:
raise InvalidOperationException("Json protocol not support this method")
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = GetRecordsRequestParams(cursor, limit_num)
content = self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers(),
compress_format=self._compress_format)
result = GetRecordsResult.parse_content(content, record_schema=record_schema)
return result
class DataHubPB(DataHubJson):
"""
DataHub protobuf client
"""
def __init__(self, access_id, access_key, endpoint=None, compress_format=None, enable_schema_register=False, **kwargs):
super().__init__(access_id, access_key, endpoint, compress_format, enable_schema_register, **kwargs)
def put_records(self, project_name, topic_name, record_list):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if record_list is None or len(record_list) == 0:
raise InvalidParameterException("Record list is null or empty")
url = Path.SHARDS % (project_name, topic_name)
request_param = PutPBRecordsRequestParams(record_list)
content = self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers(),
compress_format=self._compress_format)
result = PutPBRecordsResult.parse_content(content)
return result
def put_records_by_shard(self, project_name, topic_name, shard_id, record_list):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if record_list is None or len(record_list) == 0:
raise InvalidParameterException("Record list is null or empty")
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = PutPBRecordsRequestParams(record_list)
self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers(),
compress_format=self._compress_format)
def get_blob_records(self, project_name, topic_name, sub_id, shard_id, cursor, limit_num):
return self.__get_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num)
def get_tuple_records(self, project_name, topic_name, sub_id, shard_id, record_schema, cursor, limit_num):
return self.__get_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num, record_schema)
def __get_records(self, project_name, topic_name, sub_id, shard_id, cursor, limit_num, record_schema=None):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if check_empty(cursor):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'cursor')
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = GetPBRecordsRequestParams(cursor, limit_num)
content = self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers(sub_id),
compress_format=self._compress_format)
result = GetPBRecordsResult.parse_content(content, record_schema=record_schema)
return result
class DataHubBatch(DataHubJson):
"""
DataHub batch client
"""
def __init__(self, access_id, access_key, endpoint=None, compress_format=None, enable_schema_register=True, **kwargs):
super().__init__(access_id, access_key, endpoint, compress_format, enable_schema_register, **kwargs)
self._schema_register = SchemaRegistryClient(self) if enable_schema_register else None
def put_records(self, project_name, topic_name, record_list):
raise DatahubException("This method is not supported for batch client, please use put_records_by_shard")
def put_records_by_shard(self, project_name, topic_name, shard_id, record_list):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if record_list is None or len(record_list) == 0:
raise InvalidParameterException("Record list is null or empty")
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = PutBatchRecordsRequestParams(record_list, project_name, topic_name, self._compress_format,
self._schema_register)
self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers())
def get_blob_records(self, project_name, topic_name, sub_id, shard_id, cursor, limit_num):
return self.__get_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num)
def get_tuple_records(self, project_name, topic_name, sub_id, shard_id, record_schema, cursor, limit_num):
return self.__get_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num, record_schema)
def __get_records(self, project_name, topic_name, sub_id, shard_id, cursor, limit_num, record_schema=None):
if check_empty(project_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'project_name')
if check_empty(topic_name):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'topic_name')
if check_empty(shard_id):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'shard_id')
if check_empty(cursor):
raise InvalidParameterException(ErrorMessage.PARAMETER_EMPTY % 'cursor')
url = Path.SHARD % (project_name, topic_name, shard_id)
request_param = GetBatchRecordsRequestParams(cursor, limit_num)
content = self._rest_client.post(url, data=request_param.content(), headers=request_param.extra_headers(sub_id),
compress_format=self._compress_format)
result = GetBatchRecordsResult.parse_content(content, record_schema=record_schema, project_name=project_name,
topic_name=topic_name, init_schema=record_schema,
schema_register=self._schema_register if record_schema else None)
return result