datahub/models/params.py (881 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import abc
import json
import six
from cprotobuf.internal import encode_data
from ..batch.batch_serializer import BatchSerializer
from ..batch.utils import SchemaObject
from ..models import CursorType, RecordType, RecordSchema
from ..proto.datahub_record_proto_pb import PutRecordsRequest, GetRecordsRequest, PutBinaryRecordsRequest
from ..rest import ContentType, Headers
from ..utils import pb_message_wrap
@six.add_metaclass(abc.ABCMeta)
class RequestParams(object):
"""
Abstract class to be implement
"""
@abc.abstractmethod
def content(self):
pass
@staticmethod
def extra_headers():
return {}
def __repr__(self):
return self.content()
class CreateProjectRequestParams(RequestParams):
"""
Request params of create project api
"""
__slots__ = '_comment'
def __init__(self, comment):
self._comment = comment
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
def content(self):
return json.dumps({
"Comment": self._comment
})
class UpdateProjectRequestParams(RequestParams):
"""
Request params of update project api
"""
__slots__ = '_comment'
def __init__(self, comment):
self._comment = comment
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
def content(self):
return json.dumps({
"Comment": self._comment
})
class CreateTopicRequestParams(RequestParams):
"""
Request params of create topic api
"""
__slots__ = ('_shard_count', '_life_cycle', '_record_type', '_record_schema',
'_extend_mode', '_comment')
def __init__(self, shard_count, life_cycle, record_type, record_schema, extend_mode, comment):
self._shard_count = shard_count
self._life_cycle = life_cycle
self._record_type = record_type
self._record_schema = record_schema
self._extend_mode = extend_mode
self._comment = comment
@property
def shard_count(self):
return self._shard_count
@shard_count.setter
def shard_count(self, value):
self._shard_count = value
@property
def life_cycle(self):
return self._life_cycle
@life_cycle.setter
def life_cycle(self, value):
self._life_cycle = value
@property
def record_type(self):
return self._record_type
@record_type.setter
def record_type(self, value):
self._record_type = value
@property
def record_schema(self):
return self._record_schema
@record_schema.setter
def record_schema(self, value):
self._record_schema = value
@property
def extend_mode(self):
return self._extend_mode
@extend_mode.setter
def extend_mode(self, value):
self._extend_mode = value
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
def content(self):
data = {
"ShardCount": self._shard_count,
"Lifecycle": self._life_cycle,
"RecordType": self._record_type.value,
"Comment": self._comment
}
if RecordType.TUPLE == self._record_type:
if isinstance(self._record_schema, RecordSchema):
data['RecordSchema'] = self._record_schema.to_json_string()
else:
data['RecordSchema'] = self._record_schema
if self._extend_mode is not None:
data['ExpandMode'] = 'extend' if self._extend_mode else 'split'
return json.dumps(data)
class UpdateTopicRequestParams(RequestParams):
"""
Request params of update topic api
"""
__slots__ = ('_life_cycle', '_comment')
def __init__(self, life_cycle, comment):
self._life_cycle = life_cycle
self._comment = comment
@property
def life_cycle(self):
return self._life_cycle
@life_cycle.setter
def life_cycle(self, value):
self._life_cycle = value
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
def content(self):
return json.dumps({
"Lifecycle": self._life_cycle,
"Comment": self._comment
})
class MergeShardRequestParams(RequestParams):
"""
Request params of merge shard api
"""
__slots__ = ('_shard_id', '_adj_shard_id')
def __init__(self, shard_id, adj_shard_id):
self._shard_id = shard_id
self._adj_shard_id = adj_shard_id
@property
def shard_id(self):
return self._shard_id
@shard_id.setter
def shard_id(self, value):
self._shard_id = value
@property
def adj_shard_id(self):
return self._adj_shard_id
@adj_shard_id.setter
def adj_shard_id(self, value):
self._adj_shard_id = value
def content(self):
return json.dumps({
"Action": "merge",
"ShardId": self._shard_id,
"AdjacentShardId": self._adj_shard_id
})
class SplitShardRequestParams(RequestParams):
"""
Request params of split shard api
"""
__slots__ = ('_shard_id', '_split_key')
def __init__(self, shard_id, split_key):
self._shard_id = shard_id
self._split_key = split_key
@property
def shard_id(self):
return self._shard_id
@shard_id.setter
def shard_id(self, value):
self._shard_id = value
@property
def split_key(self):
return self._split_key
@split_key.setter
def split_key(self, value):
self._split_key = value
def content(self):
return json.dumps({
"Action": "split",
"ShardId": self._shard_id,
"SplitKey": self._split_key
})
class ExtendShardRequestParams(RequestParams):
"""
Request params of extend shard api
"""
__slots__ = ('_shard_count')
def __init__(self, shard_count):
self._shard_count = shard_count
@property
def shard_count(self):
return self._shard_count
@shard_count.setter
def shard_count(self, value):
self._shard_count = value
def content(self):
return json.dumps({
"Action": "extend",
"ExtendMode": "TO",
"ShardNumber": self._shard_count
})
class GetCursorRequestParams(RequestParams):
"""
Request params of get cursor api
"""
__slots__ = ('_type', '_param')
def __init__(self, cursor_type, param):
self._type = cursor_type
self._param = param
@property
def type(self):
return self._type
@type.setter
def type(self, value):
self._type = value
@property
def param(self):
return self._param
@param.setter
def param(self, value):
self._param = value
def content(self):
data = {
"Action": "cursor",
"Type": self._type.name
}
if CursorType.SYSTEM_TIME == self._type:
data['SystemTime'] = self._param
elif CursorType.SEQUENCE == self._type:
data['Sequence'] = self._param
return json.dumps(data)
class PutRecordsRequestParams(RequestParams):
"""
Request params of put records api
"""
__slots__ = '_record_list'
action = 'pub'
def __init__(self, record_list):
self._record_list = record_list
@property
def record_list(self):
return self._record_list
@record_list.setter
def record_list(self, value):
self._record_list = value
def content(self):
return json.dumps({
"Action": PutRecordsRequestParams.action,
"Records": [record.to_json() for record in self._record_list]
})
class PutPBRecordsRequestParams(PutRecordsRequestParams):
"""
Protobuf Request params of put records api
"""
def content(self):
pb_put_record_request = {
'records': []
}
for record in self._record_list:
pb_put_record_request['records'].append(record.to_pb_record_entry())
pb_data = encode_data(PutRecordsRequest, pb_put_record_request)
return pb_message_wrap(pb_data)
@staticmethod
def extra_headers():
return {
Headers.REQUEST_ACTION: PutRecordsRequestParams.action,
Headers.CONTENT_TYPE: ContentType.HTTP_PROTOBUF.value
}
class PutBatchRecordsRequestParams(PutRecordsRequestParams):
"""
Batch Request params of put records api
"""
def __init__(self, record_list, project_name, topic_name, compress_type, schema_register):
super().__init__(record_list)
self._project_name = project_name
self._topic_name = topic_name
self._compress_type = compress_type
self._schema_register = schema_register
def content(self):
schema_object = SchemaObject(self._project_name, self._topic_name, self._schema_register)
record_data = BatchSerializer.serialize(self._compress_type, schema_object, self._record_list)
batch_put_record_request = {
'records': [{'data': record_data}]
}
batch_data = encode_data(PutBinaryRecordsRequest, batch_put_record_request)
return pb_message_wrap(batch_data)
@staticmethod
def extra_headers():
return {
Headers.REQUEST_ACTION: PutRecordsRequestParams.action,
Headers.CONTENT_TYPE: ContentType.HTTP_BATCH.value
}
class GetRecordsRequestParams(RequestParams):
"""
Request params of get records api
"""
__slots__ = ('_cursor', '_limit_num')
action = 'sub'
def __init__(self, cursor, limit_num):
self._cursor = cursor
self._limit_num = limit_num
@property
def cursor(self):
return self._cursor
@cursor.setter
def cursor(self, value):
self._cursor = value
@property
def limit_num(self):
return self._limit_num
@limit_num.setter
def limit_num(self, value):
self._limit_num = value
def content(self):
return json.dumps({
"Action": GetRecordsRequestParams.action,
"Cursor": self._cursor,
"Limit": self._limit_num
})
class GetPBRecordsRequestParams(GetRecordsRequestParams):
"""
Protobuf Request params of get records api
"""
def content(self):
pb_get_record_request = {
'cursor': self._cursor,
'limit': self._limit_num
}
pb_data = encode_data(GetRecordsRequest, pb_get_record_request)
return pb_message_wrap(pb_data)
@staticmethod
def extra_headers(sub_id=None):
header = {
Headers.REQUEST_ACTION: GetRecordsRequestParams.action,
Headers.CONTENT_TYPE: ContentType.HTTP_PROTOBUF.value
}
if sub_id is not None:
header[Headers.CONTENT_SUB_ID] = sub_id
return header
class GetBatchRecordsRequestParams(GetRecordsRequestParams):
"""
Batch Request params of get records api
"""
def content(self):
batch_get_record_request = {
'cursor': self._cursor,
'limit': self._limit_num
}
batch_data = encode_data(GetRecordsRequest, batch_get_record_request)
return pb_message_wrap(batch_data)
@staticmethod
def extra_headers(sub_id=None):
header = {
Headers.REQUEST_ACTION: GetRecordsRequestParams.action,
Headers.CONTENT_TYPE: ContentType.HTTP_BATCH.value
}
if sub_id is not None:
header[Headers.CONTENT_SUB_ID] = sub_id
return header
class GetMeteringInfoRequestParams(RequestParams):
"""
Request params of get metering info api
"""
def content(self):
return json.dumps({
"Action": "meter"
})
class CreateConnectorParams(RequestParams):
"""
Request params of create data connector api
"""
__slots__ = ('_column_fields', '_config', '_start_time')
def __init__(self, column_fields, config, start_time):
self._column_fields = column_fields
if not self._column_fields:
self._column_fields = []
self._config = config
self._start_time = start_time
@property
def column_fields(self):
return self._column_fields
@column_fields.setter
def column_fields(self, value):
self._column_fields = value
@property
def config(self):
return self._config
@config.setter
def config(self, value):
self._config = value
@property
def start_time(self):
return self._start_time
@start_time.setter
def start_time(self, value):
self._start_time = value
def content(self):
return json.dumps({
"Action": "Create",
"ColumnFields": self._column_fields,
"Config": self._config.to_json(),
"SinkStartTime": self._start_time
})
class UpdateConnectorParams(RequestParams):
"""
Request params of update data connector config api
"""
__slots__ = '_config'
def __init__(self, config):
self._config = config
@property
def config(self):
return self._config
@config.setter
def config(self, value):
self._config = value
def content(self):
return json.dumps({
"Action": "updateconfig",
"Config": self._config.to_json()
})
class GetConnectorShardStatusParams(RequestParams):
"""
Request params of get data connector shard status api
"""
__slots__ = '_shard_id'
def __init__(self, shard_id):
self._shard_id = shard_id
@property
def shard_id(self):
return self._shard_id
@shard_id.setter
def shard_id(self, value):
self._shard_id = value
def content(self):
data = {
"Action": "Status"
}
if self._shard_id:
data['ShardId'] = self._shard_id
return json.dumps(data)
class ReloadConnectorParams(RequestParams):
"""
Request params of get data connector shard status api
"""
__slots__ = '_shard_id'
def __init__(self, shard_id):
self._shard_id = shard_id
@property
def shard_id(self):
return self._shard_id
@shard_id.setter
def shard_id(self, value):
self._shard_id = value
def content(self):
data = {
"Action": "Reload"
}
if self._shard_id:
data['ShardId'] = self._shard_id
return json.dumps(data)
class AppendFieldParams(RequestParams):
"""
Request params of append field api
"""
__slots__ = ('_field_name', '_field_type')
def __init__(self, field_name, field_type):
self._field_name = field_name
self._field_type = field_type
@property
def field_name(self):
return self._field_name
@field_name.setter
def field_name(self, value):
self._field_name = value
@property
def field_type(self):
return self._field_type
@field_type.setter
def field_type(self, value):
self._field_type = value
def content(self):
return json.dumps({
"Action": "appendfield",
"FieldName": self._field_name,
"FieldType": self._field_type.value
})
class AppendConnectorFieldParams(RequestParams):
"""
Request params of append data connector field api
"""
__slots__ = '_field_name'
def __init__(self, field_name):
self._field_name = field_name
@property
def field_name(self):
return self._field_name
@field_name.setter
def field_name(self, value):
self._field_name = value
def content(self):
return json.dumps({
"Action": "appendfield",
"FieldName": self._field_name
})
class InitAndGetSubscriptionOffsetParams(RequestParams):
"""
Request params of init and get subscription offset api
"""
__slots__ = '_shard_ids'
def __init__(self, shard_ids):
self._shard_ids = shard_ids
@property
def shard_ids(self):
return self._shard_ids
@shard_ids.setter
def shard_ids(self, value):
self._shard_ids = value
def content(self):
return json.dumps({
"Action": "open",
"ShardIds": self._shard_ids
})
class GetSubscriptionOffsetParams(RequestParams):
"""
Request params of get subscription offset api
"""
__slots__ = '_shard_ids'
def __init__(self, shard_ids):
self._shard_ids = shard_ids
@property
def shard_ids(self):
return self._shard_ids
@shard_ids.setter
def shard_ids(self, value):
self._shard_ids = value
def content(self):
data = {
"Action": "get"
}
if self._shard_ids:
data['ShardIds'] = self._shard_ids
return json.dumps(data)
class UpdateSubscriptionOffsetParams(RequestParams):
"""
Request params of update subscription offset api
"""
__slots__ = '_offsets'
def __init__(self, offsets):
self._offsets = offsets
@property
def offsets(self):
return self._offsets
@offsets.setter
def offsets(self, value):
self._offsets = value
def content(self):
offsets = {}
for (k, v) in self._offsets.items():
offsets.update({
k: v.to_json()
})
return json.dumps({
"Action": "commit",
"Offsets": offsets
})
class UpdateConnectorStateParams(RequestParams):
"""
Request params of update connector state
"""
__slots__ = '_connector_state'
def __init__(self, connector_state):
self._connector_state = connector_state
@property
def connector_state(self):
return self._connector_state
@connector_state.setter
def connector_state(self, value):
self._connector_state = value
def content(self):
return json.dumps({
'Action': 'UpdateState',
'State': self._connector_state.value
})
class UpdateConnectorOffsetParams(RequestParams):
"""
Request params of update connector state
"""
__slots__ = ('_shard_id', '_connector_offset')
def __init__(self, shard_id, connector_offset):
self._shard_id = shard_id
self._connector_offset = connector_offset
@property
def shard_id(self):
return self._shard_id
@shard_id.setter
def shard_id(self, value):
self._shard_id = value
@property
def connector_offset(self):
return self._connector_offset
@connector_offset.setter
def connector_offset(self, value):
self._connector_offset = value
def content(self):
data = {
'Action': 'UpdateShardContext'
}
if self._shard_id:
data['ShardId'] = self._shard_id
if self._connector_offset.sequence > -1:
data["CurrentSequence"] = self._connector_offset.sequence
if self._connector_offset.timestamp > -1:
data["CurrentTime"] = self._connector_offset.timestamp
return json.dumps(data)
class UpdateConnectorShardContextParams(RequestParams):
"""
Request params of update connector shard context
"""
__slots__ = ('_shard_id', '_start_sequence', '_end_sequence', '_current_sequence')
def __init__(self, shard_id, start_sequence, end_sequence, current_sequence):
self._shard_id = shard_id
self._start_sequence = start_sequence
self._end_sequence = end_sequence
self._current_sequence = current_sequence
@property
def shard_id(self):
return self._shard_id
@shard_id.setter
def shard_id(self, value):
self._shard_id = value
@property
def start_sequence(self):
return self._start_sequence
@start_sequence.setter
def start_sequence(self, value):
self._start_sequence = value
@property
def end_sequence(self):
return self._end_sequence
@end_sequence.setter
def end_sequence(self, value):
self._end_sequence = value
@property
def current_sequence(self):
return self._current_sequence
@current_sequence.setter
def current_sequence(self, value):
self._current_sequence = value
def content(self):
data = {
'Action': 'UpdateShardContext',
'ShardId': self._shard_id,
'CurrentSequence': self._current_sequence
}
if self._start_sequence >= 0:
data['StartSequence'] = self._start_sequence
if self._end_sequence >= 0:
data['EndSequence'] = self._end_sequence
return json.dumps(data)
class CreateSubscriptionParams(RequestParams):
"""
Request params of create subscription
"""
__slots__ = '_comment'
def __init__(self, comment):
self._comment = comment
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
def content(self):
return json.dumps({
'Action': 'create',
'Comment': self._comment
})
class UpdateSubscriptionParams(RequestParams):
"""
Request params of update subscription
"""
__slots__ = '_comment'
def __init__(self, comment):
self._comment = comment
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
def content(self):
return json.dumps({
'Comment': self._comment
})
class UpdateSubscriptionStateParams(RequestParams):
"""
Request params of update subscription state
"""
__slots__ = '_state'
def __init__(self, state):
self._state = state
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
def content(self):
return json.dumps({
'State': self._state.value
})
class ListSubscriptionParams(RequestParams):
"""
Request params of query subscription
"""
__slots__ = ('_query_key', '_page_index', '_page_size')
def __init__(self, query_key, page_index, page_size):
self._page_index = page_index
self._page_size = page_size
self._query_key = query_key
@property
def page_index(self):
return self._page_index
@page_index.setter
def page_index(self, value):
self._page_index = value
@property
def page_size(self):
return self._page_size
@page_size.setter
def page_size(self, value):
self._page_size = value
@property
def query_key(self):
return self._query_key
@query_key.setter
def query_key(self, value):
self._query_key = value
def content(self):
data = {
'Action': 'list',
'PageIndex': self._page_index,
'PageSize': self._page_size
}
if self._query_key:
data['Search'] = self._query_key
return json.dumps(data)
class ResetSubscriptionOffsetParams(RequestParams):
"""
Request params of reset subscription offset
"""
__slots__ = '_offsets'
def __init__(self, offsets):
self._offsets = offsets
@property
def offsets(self):
return self._offsets
@offsets.setter
def offsets(self, value):
self._offsets = value
def content(self):
offsets = {}
for (k, v) in self._offsets.items():
offsets.update({
k: v.to_json()
})
return json.dumps({
"Action": "reset",
"Offsets": offsets
})
class JoinGroupParams(RequestParams):
"""
Request params of join group
"""
__slots__ = '_session_timeout'
def __init__(self, session_timeout):
self._session_timeout = session_timeout
@property
def session_timeout(self):
return self._session_timeout
@session_timeout.setter
def session_timeout(self, session_timeout):
self._session_timeout = session_timeout
def content(self):
return json.dumps({
"Action": "joinGroup",
"SessionTimeout": self._session_timeout
})
class HeartBeatParams(RequestParams):
"""
Request params of heart beat
"""
__slots__ = '_consumer_id', '_version_id', '_hold_shard_list', '_read_end_shard_list'
def __init__(self, consumer_id, version_id, hold_shard_list, read_end_shard_list):
self._consumer_id = consumer_id
self._version_id = version_id
self._hold_shard_list = hold_shard_list
self._read_end_shard_list = read_end_shard_list
@property
def consumer_id(self):
return self._consumer_id
@consumer_id.setter
def consumer_id(self, consumer_id):
self._consumer_id = consumer_id
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, version_id):
self._version_id = version_id
@property
def hold_shard_list(self):
return self._hold_shard_list
@hold_shard_list.setter
def hold_shard_list(self, hold_shard_list):
self._hold_shard_list = hold_shard_list
@property
def read_end_shard_list(self):
return self._read_end_shard_list
@read_end_shard_list.setter
def read_end_shard_list(self, read_end_shard_list):
self._read_end_shard_list = read_end_shard_list
def content(self):
return json.dumps({
"Action": "heartBeat",
"ConsumerId": self._consumer_id,
"VersionId": self._version_id,
"HoldShardList": self._hold_shard_list,
"ReadEndShardList": self._read_end_shard_list
})
class SyncGroupParams(RequestParams):
"""
Request params of sync group
"""
__slots__ = '_consumer_id', '_version_id', '_release_shard_list', '_read_end_shard_list'
def __init__(self, consumer_id, version_id, release_shard_list, read_end_shard_list):
self._consumer_id = consumer_id
self._version_id = version_id
self._release_shard_list = release_shard_list
self._read_end_shard_list = read_end_shard_list
@property
def consumer_id(self):
return self._consumer_id
@consumer_id.setter
def consumer_id(self, consumer_id):
self._consumer_id = consumer_id
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, version_id):
self._version_id = version_id
@property
def release_shard_list(self):
return self._release_shard_list
@release_shard_list.setter
def release_shard_list(self, release_shard_list):
self._release_shard_list = release_shard_list
@property
def read_end_shard_list(self):
return self._read_end_shard_list
@read_end_shard_list.setter
def read_end_shard_list(self, read_end_shard_list):
self._read_end_shard_list = read_end_shard_list
def content(self):
return json.dumps({
"Action": "syncGroup",
"ConsumerId": self._consumer_id,
"VersionId": self._version_id,
"ReleaseShardList": self._release_shard_list,
"ReadEndShardList": self._read_end_shard_list
})
class LeaveGroupParams(RequestParams):
"""
Request params of leave group
"""
__slots__ = '_consumer_id', '_version_id'
def __init__(self, consumer_id, version_id):
self._consumer_id = consumer_id
self._version_id = version_id
@property
def consumer_id(self):
return self._consumer_id
@consumer_id.setter
def consumer_id(self, consumer_id):
self._consumer_id = consumer_id
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, version_id):
self._version_id = version_id
def content(self):
return json.dumps({
"Action": "leaveGroup",
"ConsumerId": self._consumer_id,
"VersionId": self._version_id,
})
class ListTopicSchemaParams(RequestParams):
"""
Request params of list topic schema
"""
__slots__ = '_page_number', '_page_size'
def __init__(self, page_number, page_size):
self._page_number = page_number
self._page_size = page_size
@property
def page_number(self):
return self._page_number
@page_number.setter
def page_number(self, page_number):
self._page_number = page_number
@property
def page_size(self):
return self._page_size
@page_size.setter
def page_size(self, page_size):
self._page_size = page_size
def content(self):
return json.dumps({
"Action": "ListSchema",
"PageNumber": self._page_number,
"PageSize": self._page_size
})
class GetTopicSchemaParams(RequestParams):
"""
Request params of get topic schema
"""
__slots__ = '_version_id', '_record_schema'
def __init__(self, version_id, record_schema):
self._version_id = version_id
self._record_schema = record_schema
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, version_id):
self._version_id = version_id
@property
def record_schema(self):
return self._record_schema
@record_schema.setter
def record_schema(self, record_schema):
self._record_schema = record_schema
def content(self):
return json.dumps({
"Action": "GetSchema",
"VersionId": self._version_id,
"RecordSchema": self._record_schema.to_json_string() if self._record_schema else ""
})
class RegisterTopicSchemaParams(RequestParams):
"""
Request params of register topic schema
"""
__slots__ = '_record_schema'
def __init__(self, record_schema):
self._record_schema = record_schema
@property
def record_schema(self):
return self._record_schema
@record_schema.setter
def record_schema(self, record_schema):
self._record_schema = record_schema
def content(self):
return json.dumps({
"Action": "RegisterSchema",
"RecordSchema": self._record_schema.to_json_string() if self._record_schema else ""
})
class DeleteTopicSchemaParams(RequestParams):
"""
Request params of delete topic schema
"""
__slots__ = '_version_id', '_record_schema'
def __init__(self, version_id):
self._version_id = version_id
@property
def version_id(self):
return self._version_id
@version_id.setter
def version_id(self, version_id):
self._version_id = version_id
def content(self):
return json.dumps({
"Action": "DeleteSchema",
"VersionId": self._version_id
})