datahub/models/results.py (1,042 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# 'License'); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import abc
import json
import six
from datahub.exceptions import DatahubException
from .connector import ConnectorType, ConnectorState, get_connector_builder_by_type, \
ConnectorShardStatus, ShardStatusEntry
from .record import FailedRecord, BlobRecord, TupleRecord, RecordType
from .schema import RecordSchema
from .shard import Shard, ShardBase, ShardContext
from .subscription import Subscription, OffsetWithBatchIndex
from ..batch.batch_serializer import BatchSerializer
from ..batch.utils import SchemaObject
from ..proto.datahub_record_proto_pb import GetRecordsResponse, PutRecordsResponse, GetBinaryRecordsResponse
from ..utils import to_text, unwrap_pb_frame
@six.add_metaclass(abc.ABCMeta)
class Result(object):
"""
Abstract class to be implement
"""
@classmethod
@abc.abstractmethod
def parse_content(cls, content, **kwargs):
pass
@abc.abstractmethod
def to_json(self):
pass
def __repr__(self):
return to_text(self.to_json())
class ListProjectResult(Result):
"""
Request params of list projects api
Members:
project_names (:class:`list`): list of project names
"""
__slots__ = '_project_names'
def __init__(self, project_names):
self._project_names = project_names
@property
def project_names(self):
return self._project_names
@project_names.setter
def project_names(self, value):
self._project_names = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['ProjectNames'])
def to_json(self):
return {
'ProjectNames': self._project_names
}
class GetProjectResult(Result):
"""
Result of get project api
Members:
project_name (:class:`str`): project name
comment (:class:`str`): project description
create_time (:class:`int`): create time
last_modify_time(:class:`int`): last modify time
"""
__slots__ = ('_project_name', '_comment', '_create_time', '_last_modify_time')
def __init__(self, project_name, comment, create_time, last_modify_time):
self._project_name = project_name
self._comment = comment
self._create_time = create_time
self._last_modify_time = last_modify_time
@property
def project_name(self):
return self._project_name
@project_name.setter
def project_name(self, value):
self._project_name = value
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
@property
def create_time(self):
return self._create_time
@create_time.setter
def create_time(self, value):
self._create_time = value
@property
def last_modify_time(self):
return self._last_modify_time
@last_modify_time.setter
def last_modify_time(self, value):
self._last_modify_time = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(kwargs['project_name'], content['Comment'], content['CreateTime'], content['LastModifyTime'])
def to_json(self):
return {
'ProjectName': self._project_name,
'Comment': self._comment,
'CreateTime': self._create_time,
'LastModifyTime': self.last_modify_time
}
class ListTopicResult(Result):
"""
Result of list topics api
Members:
topic_names (:class:`list`): list of topic names
"""
__slots__ = '_topic_names'
def __init__(self, topic_names):
self._topic_names = topic_names
@property
def topic_names(self):
return self._topic_names
@topic_names.setter
def topic_names(self, value):
self._topic_names = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['TopicNames'])
def to_json(self):
return {
'TopicNames': self._topic_names
}
class GetTopicResult(Result):
"""
Result of get topic api
Members:
project_name (:class:`str`): project name
topic_name (:class:`str`): topic name
shard_count (:class:`int`) shard count
life_cycle (:class:`int`) life cycle
record_type (:class:`datahub.models.RecordType`): record type
record_schema (:class:`datahub.models.RecordSchema`): record schema
comment (:class:`str`): project description
create_time (:class:`int`): create time
last_modify_time(:class:`int`): last modify time
"""
__slots__ = ('_project_name', '_topic_name', '_shard_count', '_life_cycle', '_record_type', '_record_schema',
'_comment', '_create_time', '_last_modify_time')
def __init__(self, **kwargs):
self._project_name = kwargs['project_name'] if 'project_name' in kwargs else ''
self._topic_name = kwargs['topic_name'] if 'topic_name' in kwargs else ''
self._shard_count = kwargs['shard_count'] if 'shard_count' in kwargs else 0
self._life_cycle = kwargs['life_cycle'] if 'life_cycle' in kwargs else 0
self._record_type = RecordType(kwargs['record_type']) if 'record_type' in kwargs else ''
self._record_schema = kwargs['record_schema'] if 'record_schema' in kwargs else None
self._comment = kwargs['comment'] if 'comment' in kwargs else ''
self._create_time = kwargs['create_time'] if 'create_time' in kwargs else 0
self._last_modify_time = kwargs['last_modify_time'] if 'last_modify_time' in kwargs else 0
@property
def project_name(self):
return self._project_name
@project_name.setter
def project_name(self, value):
self._project_name = value
@property
def topic_name(self):
return self._topic_name
@topic_name.setter
def topic_name(self, value):
self._topic_name = value
@property
def shard_count(self):
return self._shard_count
@shard_count.setter
def shard_count(self, value):
self._shard_count = value
@property
def life_cycle(self):
return self._life_cycle
@life_cycle.setter
def life_cycle(self, value):
self._life_cycle = value
@property
def record_type(self):
return self._record_type
@record_type.setter
def record_type(self, value):
self._record_type = value
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
@property
def record_schema(self):
return self._record_schema
@record_schema.setter
def record_schema(self, value):
self._record_schema = value
@property
def create_time(self):
return self._create_time
@create_time.setter
def create_time(self, value):
self._create_time = value
@property
def last_modify_time(self):
return self._last_modify_time
@last_modify_time.setter
def last_modify_time(self, value):
self._last_modify_time = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
record_schema = None
record_type = RecordType(content['RecordType'])
if record_type == RecordType.TUPLE:
record_schema = RecordSchema.from_json_str(content['RecordSchema'])
return cls(project_name=kwargs['project_name'], topic_name=kwargs['topic_name'],
comment=content['Comment'], create_time=content['CreateTime'],
last_modify_time=content['LastModifyTime'], life_cycle=content['Lifecycle'],
record_type=record_type, record_schema=record_schema, shard_count=content['ShardCount'])
def to_json(self):
data = {
'ProjectName': self._project_name,
'TopicName': self._topic_name,
'Comment': self._comment,
'CreateTime': self._create_time,
'LastModifyTime': self._last_modify_time,
'Lifecycle': self._life_cycle,
'RecordType': self._record_type.value,
'ShardCount': self._shard_count
}
if self._record_type == RecordType.TUPLE:
data['RecordSchema'] = self._record_schema.to_json()
return data
class ListShardResult(Result):
"""
Result of list shards api
Members:
shards (:class:`list`): list of :obj:`datahub.models.Shard`
"""
__slots__ = '_shards', '_protocol', '_interval'
def __init__(self, shards, protocol, interval):
self._shards = shards
self._protocol = protocol
self._interval = interval
@property
def shards(self):
return self._shards
@shards.setter
def shards(self, value):
self._shards = value
@property
def protocol(self):
return self._protocol
@protocol.setter
def protocol(self, value):
self._protocol = value
@property
def interval(self):
return self._interval
@interval.setter
def interval(self, value):
self._interval = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
shards = [Shard.from_dict(item) for item in content['Shards']]
protocol = content['Protocol']
interval = content['Interval']
return cls(shards, protocol, interval)
def to_json(self):
return {
'Shards': [shard.to_json() for shard in self._shards],
'Protocol': self._protocol,
'Interval': self._interval
}
class MergeShardResult(ShardBase, Result):
"""
Result of merge shard api
Members:
shard_id (:class:`str`): shard id
begin_hash_key (:class:`str`): begin hash key
end_hash_key (:class:`str`): end hash key
"""
__slots__ = ('_shard_id', '_begin_hash_key', '_end_hash_key')
def __init__(self, shard_id, begin_hash_key, end_hash_key):
super(MergeShardResult, self).__init__(shard_id, begin_hash_key, end_hash_key)
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['ShardId'], content['BeginHashKey'], content['EndHashKey'])
def to_json(self):
return super(MergeShardResult, self).to_json()
class SplitShardResult(Result):
"""
Result of split shard api
Members:
new_shards (:class:`list`): list of :obj:`datahub.models.ShardBase`
"""
__slots__ = '_new_shards'
def __init__(self, new_shards):
self._new_shards = new_shards
@property
def new_shards(self):
return self._new_shards
@new_shards.setter
def new_shards(self, value):
self._new_shards = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
new_shards = [ShardBase.from_dict(item) for item in content['NewShards']]
return cls(new_shards)
def to_json(self):
return {
'NewShards': [shard.to_json() for shard in self._new_shards]
}
class GetCursorResult(Result):
"""
Request params of get cursor api
Members:
cursor (:class:`str`): cursor
record_time (:class:`int`): record time
sequence (:class:`int`): sequence
"""
__slots__ = ('_cursor', '_record_time', '_sequence')
def __init__(self, cursor, record_time, sequence):
self._cursor = cursor
self._record_time = record_time
self._sequence = sequence
@property
def cursor(self):
return self._cursor
@cursor.setter
def cursor(self, value):
self._cursor = value
@property
def record_time(self):
return self._record_time
@record_time.setter
def record_time(self, value):
self._record_time = value
@property
def sequence(self):
return self._sequence
@sequence.setter
def sequence(self, value):
self._sequence = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['Cursor'], content['RecordTime'], content['Sequence'])
def to_json(self):
return {
'Cursor': self._cursor,
'Sequence': self._sequence,
'RecordTime': self._record_time
}
class PutRecordsResult(Result):
"""
Result of put records api
Members:
failed_record_count (:class:`int`): failed record count
failed_records (:class:`list`): list of :obj:`datahub.models.FailedRecord`
"""
__slots__ = ('_failed_record_count', '_failed_records')
def __init__(self, failed_record_count, failed_records):
self._failed_record_count = failed_record_count
self._failed_records = failed_records
@property
def failed_record_count(self):
return self._failed_record_count
@failed_record_count.setter
def failed_record_count(self, value):
self._failed_record_count = value
@property
def failed_records(self):
return self._failed_records
@failed_records.setter
def failed_records(self, value):
self._failed_records = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
failed_records = [
FailedRecord(item['Index'], item['ErrorCode'], item['ErrorMessage'])
for item in content['FailedRecords']
]
return cls(content['FailedRecordCount'], failed_records)
def to_json(self):
return {
'FailedRecordCount': self._failed_record_count,
'FailedRecords': [failed_record.to_json() for failed_record in self._failed_records]
}
class PutPBRecordsResult(PutRecordsResult):
"""
Protobuf Result of put records api
"""
@classmethod
def parse_content(cls, content, **kwargs):
crc, compute_crc, pb_str = unwrap_pb_frame(content)
if crc != compute_crc:
raise DatahubException('Parse pb response body fail, error: crc check error. crc: %s, compute crc: %s'
% (crc, compute_crc))
pb_put_record_response = PutRecordsResponse()
pb_put_record_response.ParseFromString(pb_str)
pb_failed_records = pb_put_record_response.failed_records
failed_records = [FailedRecord.from_pb_message(pb_failed_record) for pb_failed_record in pb_failed_records]
return cls(pb_put_record_response.failed_count, failed_records)
class GetRecordsResult(Result):
"""
Result of get records api
Members:
next_cursor (:class:`str`): next cursor
record_count (:class:`int`): record count
start_squ (:class:`int`): start sequence
records (:class:`list`): list of :obj:`datahub.models.BlobRecord`/:obj:`datahub.models.TupleRecord`
"""
__slots__ = ('_next_cursor', '_record_count', '_start_seq', '_records')
def __init__(self, next_cursor, record_count, start_seq, records):
self._next_cursor = next_cursor
self._record_count = record_count
self._start_seq = start_seq
self._records = records
@property
def next_cursor(self):
return self._next_cursor
@next_cursor.setter
def next_cursor(self, value):
self._next_cursor = value
@property
def record_count(self):
return self._record_count
@record_count.setter
def record_count(self, value):
self._record_count = value
@property
def start_seq(self):
return self._start_seq
@start_seq.setter
def start_seq(self, value):
self._start_seq = value
@property
def records(self):
return self._records
@records.setter
def records(self, value):
self._records = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
records = []
for item in content['Records']:
data = item['Data']
if isinstance(data, six.string_types):
record = BlobRecord(values=data)
else:
record_schema = kwargs['record_schema']
record = TupleRecord(schema=record_schema, values=data)
if 'Attributes' in item:
record.attributes = item['Attributes']
record.sequence = item['Sequence']
record.system_time = item['SystemTime']
records.append(record)
return cls(content['NextCursor'], content['RecordCount'], content['StartSeq'], records)
def to_json(self):
return {
'NextCursor': self._next_cursor,
'RecordCount': self._record_count,
'StartSeq': self._start_seq,
'Records': [record.to_json() for record in self._records]
}
class GetPBRecordsResult(GetRecordsResult):
"""
Protobuf Result of get records api
"""
@classmethod
def parse_content(cls, content, **kwargs):
crc, compute_crc, pb_str = unwrap_pb_frame(content)
if crc != compute_crc:
raise DatahubException('Parse pb response body fail, error: crc check error. crc: %s, compute crc: %s'
% (crc, compute_crc))
pb_get_record_response = GetRecordsResponse()
pb_get_record_response.ParseFromString(pb_str)
next_cursor = pb_get_record_response.next_cursor
record_count = pb_get_record_response.record_count
start_sequence = pb_get_record_response.start_sequence
records = []
for pb_record in pb_get_record_response.records:
record_schema = kwargs['record_schema']
if record_schema:
values = [bp_field_data.value for bp_field_data in pb_record.data.data]
record = TupleRecord(schema=record_schema)
record._set_values(values)
else:
record = BlobRecord(blob_data=pb_record.data.data[0].value)
record._attributes = {
attribute.key: attribute.value for attribute in pb_record.attributes.attributes
}
record.system_time = pb_record.system_time
record.sequence = pb_record.sequence
records.append(record)
return cls(next_cursor, record_count, start_sequence, records)
class GetBatchRecordsResult(GetRecordsResult):
"""
Batch Result of get records api
"""
@classmethod
def parse_content(cls, content, **kwargs):
crc, compute_crc, pb_str = unwrap_pb_frame(content)
if crc != compute_crc:
raise DatahubException('Parse pb response body fail, error: crc check error. crc: %s, compute crc: %s'
% (crc, compute_crc))
project_name = kwargs['project_name']
topic_name = kwargs['topic_name']
init_schema = kwargs['init_schema']
schema_register = kwargs['schema_register']
pb_get_record_response = GetBinaryRecordsResponse()
pb_get_record_response.ParseFromString(pb_str)
next_cursor = pb_get_record_response.next_cursor
record_count = 0
start_sequence = pb_get_record_response.start_sequence
total_records_list = []
schema_object = SchemaObject(project_name, topic_name, schema_register)
for i in range(pb_get_record_response.record_count):
pb_record = pb_get_record_response.records[i]
byte_data = pb_record.data
records_list = BatchSerializer.deserialize(init_schema, schema_object, byte_data)
index, records_len = 0, len(records_list)
for record in records_list:
record.system_time = pb_record.system_time
record.sequence = pb_record.sequence
record.batch_size = records_len
record.batch_index = index
index += 1
total_records_list += records_list
record_count += records_len
return cls(next_cursor, record_count, start_sequence, total_records_list)
class GetMeteringInfoResult(Result):
"""
Result of get metering info api;
Members:
active_time (:class:`int`): active time
storage (:class:`int`): storage
"""
__slots__ = ('_active_time', '_storage')
def __init__(self, active_time, storage):
self._active_time = active_time
self._storage = storage
@property
def active_time(self):
return self._active_time
@active_time.setter
def active_time(self, value):
self._active_time = value
@property
def storage(self):
return self._storage
@storage.setter
def storage(self, value):
self._storage = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['ActiveTime'], content['Storage'])
def to_json(self):
return {
'ActiveTime': self._active_time,
'Storage': self._storage
}
class ListConnectorResult(Result):
"""
Result of list data connector
Members:
connector_names (:class:`list`): list of data connector names
"""
__slots__ = '_connector_names'
def __init__(self, connector_names, connector_ids):
self._connector_names = connector_names
self._connector_ids = connector_ids
@property
def connector_names(self):
return self._connector_names
@connector_names.setter
def connector_names(self, value):
self._connector_names = value
@property
def connector_ids(self):
return self._connector_ids
@connector_ids.setter
def connector_ids(self, value):
self._connector_ids = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['Connectors'], content['Connectors'])
def to_json(self):
return {
'Connectors': self._connector_names
}
class CreateConnectorResult(Result):
"""
Result of create connector
Members:
connector_id (:class:`str`): connector id
"""
__slots__ = '_connector_id'
def __init__(self, connector_id):
self._connector_id = connector_id
@property
def connector_id(self):
return self._connector_id
@connector_id.setter
def connector_id(self, value):
self._connector_id = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content.get('ConnectorId', ''))
def to_json(self):
return {
'ConnectorId': self._connector_id
}
class GetConnectorResult(Result):
"""
Result of get data connector
Members:
cluster_addr (:class:`str`): cluster address
connector_id (:class:`str`): connector id
type (:class:`datahub.models.ConnectorType`): connector type
state (:class:`datahub.models.ConnectorState`): connector state
creator (:class:`str`): creator
owner (:class:`str`): owner
create_time (:class:`int`): create time
column_fields (:class:`list`): list of column fields
config (:class:`datahub.models.OdpsConnectorConfig`): config
extra_config (:class:`dict`): extra config
shard_contexts (:class:`list`): list of :obj:`datahub.models.ShardContext`
sub_id (:class:`str`): subscription id used by connector
"""
__slots__ = (
'_cluster_addr', '_connector_id', '_type', '_state', '_creator', '_owner', '_create_time', '_column_fields',
'_config', '_extra_config', '_shard_contexts', '_sub_id')
def __init__(self, cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields,
config, extra_config, shard_contexts, sub_id):
self._cluster_addr = cluster_addr
self._connector_id = connector_id
self._type = connector_type
self._state = state
self._creator = creator
self._create_time = create_time
self._column_fields = column_fields
self._owner = owner
self._config = config
self._extra_config = extra_config
self._shard_contexts = shard_contexts
self._sub_id = sub_id
@property
def cluster_addr(self):
return self._cluster_addr
@cluster_addr.setter
def cluster_addr(self, value):
self._cluster_addr = value
@property
def connector_id(self):
return self._connector_id
@connector_id.setter
def connector_id(self, value):
self._connector_id = value
@property
def create_time(self):
return self._create_time
@create_time.setter
def create_time(self, value):
self._create_time = value
@property
def column_fields(self):
return self._column_fields
@column_fields.setter
def column_fields(self, value):
self._column_fields = value
@property
def type(self):
return self._type
@type.setter
def type(self, value):
self._type = value
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
@property
def creator(self):
return self._creator
@creator.setter
def creator(self, value):
self._creator = value
@property
def owner(self):
return self._owner
@owner.setter
def owner(self, value):
self._owner = value
@property
def config(self):
return self._config
@config.setter
def config(self, value):
self._config = value
@property
def extra_config(self):
return self._extra_config
@extra_config.setter
def extra_config(self, value):
self._extra_config = value
@property
def shard_contexts(self):
return self._shard_contexts
@shard_contexts.setter
def shard_contexts(self, value):
self._shard_contexts = value
@property
def sub_id(self):
return self._sub_id
@sub_id.setter
def sub_id(self, value):
self._sub_id = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
cluster_addr = content.get('ClusterAddress', '')
connector_id = content.get('ConnectorId', '')
connector_type = ConnectorType(content['Type'])
state = ConnectorState(content['State'])
creator = content.get('Creator', '')
owner = content.get('Owner', '')
create_time = content.get('CreateTime', 0)
column_fields = content.get('ColumnFields', [])
connector_config = get_connector_builder_by_type(connector_type).from_dict(content['Config'])
extra_config = content.get('ExtraInfo', {})
shard_contexts = [ShardContext.from_dict(item) for item in content['ShardContexts']] # deprecated
sub_id = extra_config.get('SubscriptionId', '')
return cls(cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields,
connector_config, extra_config, shard_contexts, sub_id)
def to_json(self):
return {
'ClusterAddress': self._cluster_addr,
'ConnectorId': self._connector_id,
'Type': self._type.value,
'State': self.state.value,
'Creator': self._creator,
'Owner': self._owner,
'CreateTime': self._create_time,
'ColumnFields': self._column_fields,
'ConnectorConfig': self._config.to_json(),
'ExtraInfo': self._extra_config,
}
class GetConnectorShardStatusResult(Result):
"""
Result of get data connector shard status
Members:
shard_status_infos (:class:`dict`): shard status entry map
"""
__slots__ = '_shard_status_infos'
def __init__(self, shard_status_infos):
self._shard_status_infos = shard_status_infos
@property
def shard_status_infos(self):
return self._shard_status_infos
@shard_status_infos.setter
def shard_status_infos(self, value):
self._shard_status_infos = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
shard_status_infos = {}
if 'ShardStatusInfos' in content:
for (k, v) in content.get('ShardStatusInfos', {}).items():
shard_status_infos.update({
k: ShardStatusEntry.from_dict(v)
})
else:
shard_status_infos[content['ShardId']] = ShardStatusEntry.from_dict(content)
return cls(shard_status_infos)
def to_json(self):
shard_status_infos = {}
for (k, v) in self._shard_status_infos.items():
shard_status_infos.update({
k: v.to_json()
})
return {
'ShardStatusInfos': shard_status_infos
}
class InitAndGetSubscriptionOffsetResult(Result):
"""
Result of init and get subscription offset api
Members:
offsets (:class:`list`): list of :obj:`datahub.models.OffsetWithSession`
"""
__slots__ = '_offsets'
def __init__(self, offsets):
self._offsets = offsets
@property
def offsets(self):
return self._offsets
@offsets.setter
def offsets(self, value):
self._offsets = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
offsets = {}
for (k, v) in content['Offsets'].items():
offsets.update({
k: OffsetWithBatchIndex.from_dict(v)
})
return cls(offsets)
def to_json(self):
offsets = {}
for (k, v) in self._offsets.items():
offsets.update({
k: v.to_json()
})
return {
'Offsets': offsets
}
class GetSubscriptionOffsetResult(Result):
"""
Result of get subscription offset api
Members:
offsets (:class:`list`): list of :obj:`datahub.models.OffsetWithVersion`
"""
__slots__ = '_offsets'
def __init__(self, offsets):
self._offsets = offsets
@property
def offsets(self):
return self._offsets
@offsets.setter
def offsets(self, value):
self._offsets = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
offsets = {}
for (k, v) in content['Offsets'].items():
offsets.update({
k: OffsetWithBatchIndex.from_dict(v)
})
return cls(offsets)
def to_json(self):
offsets = {}
for (k, v) in self._offsets.items():
offsets.update({
k: v.to_json()
})
return {
'Offsets': offsets
}
class GetConnectorDoneTimeResult(Result):
"""
Result of get connector done time api
Members:
done_time (:class`int`): done time
time_zone (:class`str`): time zone
time_window (:class`int`): time window
"""
__slots__ = ('_done_time', '_time_zone', '_time_window')
def __init__(self, done_time, time_zone, time_window):
self._done_time = done_time
self._time_zone = time_zone
self._time_window = time_window
@property
def done_time(self):
return self._done_time
@done_time.setter
def done_time(self, value):
self._done_time = value
@property
def time_zone(self):
return self._time_zone
@time_zone.setter
def time_zone(self, value):
self._time_zone = value
@property
def time_window(self):
return self._time_window
@time_window.setter
def time_window(self, value):
self._time_window = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
done_time = content.get('DoneTime', 0)
time_zone = content.get('TimeZone', '')
time_window = content.get('TimeWindow', 0)
return cls(done_time, time_zone, time_window)
def to_json(self):
return {
'DoneTime': self._done_time,
'TimeZone': self._time_zone,
'TimeWindow': self._time_window
}
class CreateSubscriptionResult(Result):
"""
Result of create subscription api
Members:
sub_id (:class:`str`): subscription id
"""
__slots__ = '_sub_id'
def __init__(self, sub_id):
self._sub_id = sub_id
@property
def sub_id(self):
return self._sub_id
@sub_id.setter
def sub_id(self, value):
self._sub_id = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return cls(content['SubId'])
def to_json(self):
return {
'SubId': self._sub_id
}
class GetSubscriptionResult(Subscription, Result):
"""
Result of get subscription api
Members:
comment (:class:`str`): comment
create_time (:class:`int`): create time
is_owner (:class:`bool`): owner or not
last_modify_time (:class:`int`): last modify time
state (:class:`str`): state
update_time (:class:`int`): update time
record_time (:class:`int`): record time
discard_count (:class:`int`): discard count
"""
__slots__ = ('_comment', '_create_time', '_is_owner', '_last_modify_time',
'_state', '_sub_id', '_topic_name', '_type')
def __init__(self, comment, create_time, is_owner, last_modify_time,
state, sub_id, topic_name, sub_type):
super(GetSubscriptionResult, self).__init__(comment, create_time, is_owner, last_modify_time,
state, sub_id, topic_name, sub_type)
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
return GetSubscriptionResult.from_dict(content)
def to_json(self):
return super(GetSubscriptionResult, self).to_json()
class ListSubscriptionResult(Result):
"""
Result of query subscription api
"""
__slots__ = ('_total_count', '_subscriptions')
def __init__(self, total_count, subscriptions):
self._total_count = total_count
self._subscriptions = subscriptions
@property
def subscriptions(self):
return self._subscriptions
@subscriptions.setter
def subscriptions(self, value):
self._subscriptions = value
@property
def total_count(self):
return self._total_count
@total_count.setter
def total_count(self, value):
self._total_count = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
subscriptions = [Subscription.from_dict(item) for item in content['Subscriptions']]
return cls(content['TotalCount'], subscriptions)
def to_json(self):
return {
'TotalCount': self._total_count,
'Subscriptions': [subscription.to_json() for subscription in self._subscriptions]
}
class JoinGroupResult(Result):
"""
Result of join group api
"""
__slots__ = '_consumer_id', '_version_id', '_session_timeout'
def __init__(self, consumer_id, version_id, session_timeout):
self._consumer_id = consumer_id
self._version_id = version_id
self._session_timeout = session_timeout
@property
def consumer_id(self):
return self._consumer_id
@consumer_id.setter
def consumer_id(self, consumer_id):
self._consumer_id = consumer_id
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, version_id):
self._version_id = version_id
@property
def session_timeout(self):
return self._session_timeout
@session_timeout.setter
def session_timeout(self, session_timeout):
self._session_timeout = session_timeout
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
consumer_id = content.get("ConsumerId", 0)
version_id = content.get("VersionId", 0)
session_timeout = content.get("SessionTimeout", 0)
return cls(consumer_id, version_id, session_timeout)
def to_json(self):
return {
"ConsumerId": self._consumer_id,
"VersionId": self._version_id,
"SessionTimeout": self._session_timeout
}
class HeartBeatResult(Result):
"""
Result of heart beat api
"""
__slots__ = '_plan_version', '_shard_list', '_total_plan'
def __init__(self, plan_version, shard_list, total_plan):
self._plan_version = plan_version
self._shard_list = shard_list
self._total_plan = total_plan
@property
def plan_version(self):
return self._plan_version
@plan_version.setter
def plan_version(self, plan_version):
self._plan_version = plan_version
@property
def shard_list(self):
return self._shard_list
@shard_list.setter
def shard_list(self, shard_list):
self._shard_list = shard_list
@property
def total_plan(self):
return self._total_plan
@total_plan.setter
def total_plan(self, total_plan):
self._total_plan = total_plan
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
plan_version = content.get("PlanVersion", 0)
shard_list = content.get("ShardList", [])
total_plan = content.get("TotalPlan", "")
return cls(plan_version, shard_list, total_plan)
def to_json(self):
return {
"PlanVersion": self._plan_version,
"ShardList": self._shard_list,
"TotalPlan": self._total_plan
}
class SyncGroupResult(Result):
"""
Result of sync group api
"""
def __int__(self):
super(SyncGroupResult, self).__init__()
@classmethod
def parse_content(cls, content, **kwargs):
return cls()
def to_json(self):
return {}
class LeaveGroupResult(Result):
"""
Result of leave group api
"""
def __int__(self):
super(LeaveGroupResult, self).__init__()
@classmethod
def parse_content(cls, content, **kwargs):
return cls()
def to_json(self):
return {}
class ListTopicSchemaResult(Result):
"""
Result of list topic schema
"""
__slots__ = '_page_number', '_page_size', '_page_count', '_total_count', '_record_schema_list'
def __init__(self, page_number, page_size, page_count, total_count, record_schema_list):
self._page_number = page_number
self._page_size = page_size
self._page_count = page_count
self._total_count = total_count
self._record_schema_list = record_schema_list
@property
def page_number(self):
return self._page_number
@page_number.setter
def page_number(self, value):
self._page_number = value
@property
def page_size(self):
return self._page_size
@page_size.setter
def page_size(self, value):
self._page_size = value
@property
def page_count(self):
return self._page_count
@page_count.setter
def page_count(self, value):
self._page_count = value
@property
def total_count(self):
return self._total_count
@total_count.setter
def total_count(self, value):
self._total_count = value
@property
def record_schema_list(self):
return self._record_schema_list
@record_schema_list.setter
def record_schema_list(self, value):
self._record_schema_list = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
page_number = content.get("PageNumber", 0)
page_size = content.get("PageSize", 0)
page_count = content.get("PageCount", 0)
total_count = content.get("TotalCount", 0)
record_schema_list = content.get("RecordSchemaList", [])
return cls(page_number, page_size, page_count, total_count, record_schema_list)
def to_json(self):
return {
"PageNumber": self._page_number,
"PageSize": self._page_size,
"PageCount": self._page_count,
"TotalCount": self._total_count,
"RecordSchemaList": self._record_schema_list
}
class GetTopicSchemaResult(Result):
"""
Result of get topic schema
"""
__slots__ = '_version_id', '_create_time', '_creator', '_record_schema'
def __init__(self, version_id, create_time, creator, record_schema):
self._version_id = version_id
self._create_time = create_time
self._creator = creator
self._record_schema = record_schema
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, value):
self._version_id = value
@property
def create_time(self):
return self._create_time
@create_time.setter
def create_time(self, value):
self._create_time = value
@property
def creator(self):
return self._creator
@creator.setter
def creator(self, value):
self._creator = value
@property
def record_schema(self):
return self._record_schema
@record_schema.setter
def record_schema(self, value):
self._record_schema = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
version_id = content.get("VersionId", 0)
create_time = content.get("CreateTime", 0)
creator = content.get("Creator", 0)
record_schema = content.get("RecordSchema", 0)
return cls(version_id, create_time, creator, record_schema)
def to_json(self):
return {
"VersionId": self._version_id,
"CreateTime": self._create_time,
"Creator": self._creator,
"RecordSchema": self._record_schema
}
class RegisterTopicSchemaResult(Result):
"""
Result of register topic schema
"""
__slots__ = '_version_id'
def __init__(self, version_id):
self._version_id = version_id
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, value):
self._version_id = value
@classmethod
def parse_content(cls, content, **kwargs):
content = json.loads(to_text(content))
version_id = content.get("VersionId", 0)
return cls(version_id)
def to_json(self):
return {
"VersionId": self._version_id,
}
class DeleteTopicSchemaResult(Result):
"""
Result of delete topic schema
"""
def __init__(self):
super(DeleteTopicSchemaResult, self).__init__()
@classmethod
def parse_content(cls, content, **kwargs):
return cls()
def to_json(self):
return {}