#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import

import abc
import json

import six
from cprotobuf.internal import encode_data

from ..batch.batch_serializer import BatchSerializer
from ..batch.utils import SchemaObject
from ..models import CursorType, RecordType, RecordSchema
from ..proto.datahub_record_proto_pb import PutRecordsRequest, GetRecordsRequest, PutBinaryRecordsRequest
from ..rest import ContentType, Headers
from ..utils import pb_message_wrap


@six.add_metaclass(abc.ABCMeta)
class RequestParams(object):
    """
    Abstract class to be implement
    """

    @abc.abstractmethod
    def content(self):
        pass

    @staticmethod
    def extra_headers():
        return {}

    def __repr__(self):
        return self.content()


class CreateProjectRequestParams(RequestParams):
    """
    Request params of create project api
    """

    __slots__ = '_comment'

    def __init__(self, comment):
        self._comment = comment

    @property
    def comment(self):
        return self._comment

    @comment.setter
    def comment(self, value):
        self._comment = value

    def content(self):
        return json.dumps({
            "Comment": self._comment
        })


class UpdateProjectRequestParams(RequestParams):
    """
    Request params of update project api
    """

    __slots__ = '_comment'

    def __init__(self, comment):
        self._comment = comment

    @property
    def comment(self):
        return self._comment

    @comment.setter
    def comment(self, value):
        self._comment = value

    def content(self):
        return json.dumps({
            "Comment": self._comment
        })


class CreateTopicRequestParams(RequestParams):
    """
    Request params of create topic api
    """

    __slots__ = ('_shard_count', '_life_cycle', '_record_type', '_record_schema',
                 '_extend_mode', '_comment')

    def __init__(self, shard_count, life_cycle, record_type, record_schema, extend_mode, comment):
        self._shard_count = shard_count
        self._life_cycle = life_cycle
        self._record_type = record_type
        self._record_schema = record_schema
        self._extend_mode = extend_mode
        self._comment = comment

    @property
    def shard_count(self):
        return self._shard_count

    @shard_count.setter
    def shard_count(self, value):
        self._shard_count = value

    @property
    def life_cycle(self):
        return self._life_cycle

    @life_cycle.setter
    def life_cycle(self, value):
        self._life_cycle = value

    @property
    def record_type(self):
        return self._record_type

    @record_type.setter
    def record_type(self, value):
        self._record_type = value

    @property
    def record_schema(self):
        return self._record_schema

    @record_schema.setter
    def record_schema(self, value):
        self._record_schema = value

    @property
    def extend_mode(self):
        return self._extend_mode

    @extend_mode.setter
    def extend_mode(self, value):
        self._extend_mode = value

    @property
    def comment(self):
        return self._comment

    @comment.setter
    def comment(self, value):
        self._comment = value

    def content(self):
        data = {
            "ShardCount": self._shard_count,
            "Lifecycle": self._life_cycle,
            "RecordType": self._record_type.value,
            "Comment": self._comment
        }

        if RecordType.TUPLE == self._record_type:
            if isinstance(self._record_schema, RecordSchema):
                data['RecordSchema'] = self._record_schema.to_json_string()
            else:
                data['RecordSchema'] = self._record_schema

        if self._extend_mode is not None:
            data['ExpandMode'] = 'extend' if self._extend_mode else 'split'

        return json.dumps(data)


class UpdateTopicRequestParams(RequestParams):
    """
    Request params of update topic api
    """

    __slots__ = ('_life_cycle', '_comment')

    def __init__(self, life_cycle, comment):
        self._life_cycle = life_cycle
        self._comment = comment

    @property
    def life_cycle(self):
        return self._life_cycle

    @life_cycle.setter
    def life_cycle(self, value):
        self._life_cycle = value

    @property
    def comment(self):
        return self._comment

    @comment.setter
    def comment(self, value):
        self._comment = value

    def content(self):
        return json.dumps({
            "Lifecycle": self._life_cycle,
            "Comment": self._comment
        })


class MergeShardRequestParams(RequestParams):
    """
    Request params of merge shard api
    """

    __slots__ = ('_shard_id', '_adj_shard_id')

    def __init__(self, shard_id, adj_shard_id):
        self._shard_id = shard_id
        self._adj_shard_id = adj_shard_id

    @property
    def shard_id(self):
        return self._shard_id

    @shard_id.setter
    def shard_id(self, value):
        self._shard_id = value

    @property
    def adj_shard_id(self):
        return self._adj_shard_id

    @adj_shard_id.setter
    def adj_shard_id(self, value):
        self._adj_shard_id = value

    def content(self):
        return json.dumps({
            "Action": "merge",
            "ShardId": self._shard_id,
            "AdjacentShardId": self._adj_shard_id
        })


class SplitShardRequestParams(RequestParams):
    """
    Request params of split shard api
    """

    __slots__ = ('_shard_id', '_split_key')

    def __init__(self, shard_id, split_key):
        self._shard_id = shard_id
        self._split_key = split_key

    @property
    def shard_id(self):
        return self._shard_id

    @shard_id.setter
    def shard_id(self, value):
        self._shard_id = value

    @property
    def split_key(self):
        return self._split_key

    @split_key.setter
    def split_key(self, value):
        self._split_key = value

    def content(self):
        return json.dumps({
            "Action": "split",
            "ShardId": self._shard_id,
            "SplitKey": self._split_key
        })


class ExtendShardRequestParams(RequestParams):
    """
    Request params of extend shard api
    """

    __slots__ = ('_shard_count')

    def __init__(self, shard_count):
        self._shard_count = shard_count

    @property
    def shard_count(self):
        return self._shard_count

    @shard_count.setter
    def shard_count(self, value):
        self._shard_count = value

    def content(self):
        return json.dumps({
            "Action": "extend",
            "ExtendMode": "TO",
            "ShardNumber": self._shard_count
        })


class GetCursorRequestParams(RequestParams):
    """
    Request params of get cursor api
    """

    __slots__ = ('_type', '_param')

    def __init__(self, cursor_type, param):
        self._type = cursor_type
        self._param = param

    @property
    def type(self):
        return self._type

    @type.setter
    def type(self, value):
        self._type = value

    @property
    def param(self):
        return self._param

    @param.setter
    def param(self, value):
        self._param = value

    def content(self):
        data = {
            "Action": "cursor",
            "Type": self._type.name
        }
        if CursorType.SYSTEM_TIME == self._type:
            data['SystemTime'] = self._param
        elif CursorType.SEQUENCE == self._type:
            data['Sequence'] = self._param
        return json.dumps(data)


class PutRecordsRequestParams(RequestParams):
    """
    Request params of put records api
    """

    __slots__ = '_record_list'

    action = 'pub'

    def __init__(self, record_list):
        self._record_list = record_list

    @property
    def record_list(self):
        return self._record_list

    @record_list.setter
    def record_list(self, value):
        self._record_list = value

    def content(self):
        return json.dumps({
            "Action": PutRecordsRequestParams.action,
            "Records": [record.to_json() for record in self._record_list]
        })


class PutPBRecordsRequestParams(PutRecordsRequestParams):
    """
    Protobuf Request params of put records api
    """
    def content(self):
        pb_put_record_request = {
            'records': []
        }
        for record in self._record_list:
            pb_put_record_request['records'].append(record.to_pb_record_entry())
        pb_data = encode_data(PutRecordsRequest, pb_put_record_request)
        return pb_message_wrap(pb_data)

    @staticmethod
    def extra_headers():
        return {
            Headers.REQUEST_ACTION: PutRecordsRequestParams.action,
            Headers.CONTENT_TYPE: ContentType.HTTP_PROTOBUF.value
        }


class PutBatchRecordsRequestParams(PutRecordsRequestParams):
    """
    Batch Request params of put records api
    """

    def __init__(self, record_list, project_name, topic_name, compress_type, schema_register):
        super().__init__(record_list)
        self._project_name = project_name
        self._topic_name = topic_name
        self._compress_type = compress_type
        self._schema_register = schema_register

    def content(self):
        schema_object = SchemaObject(self._project_name, self._topic_name, self._schema_register)
        record_data = BatchSerializer.serialize(self._compress_type, schema_object, self._record_list)
        batch_put_record_request = {
            'records': [{'data': record_data}]
        }
        batch_data = encode_data(PutBinaryRecordsRequest, batch_put_record_request)
        return pb_message_wrap(batch_data)

    @staticmethod
    def extra_headers():
        return {
            Headers.REQUEST_ACTION: PutRecordsRequestParams.action,
            Headers.CONTENT_TYPE: ContentType.HTTP_BATCH.value
        }


class GetRecordsRequestParams(RequestParams):
    """
    Request params of get records api
    """

    __slots__ = ('_cursor', '_limit_num')

    action = 'sub'

    def __init__(self, cursor, limit_num):
        self._cursor = cursor
        self._limit_num = limit_num

    @property
    def cursor(self):
        return self._cursor

    @cursor.setter
    def cursor(self, value):
        self._cursor = value

    @property
    def limit_num(self):
        return self._limit_num

    @limit_num.setter
    def limit_num(self, value):
        self._limit_num = value

    def content(self):
        return json.dumps({
            "Action": GetRecordsRequestParams.action,
            "Cursor": self._cursor,
            "Limit": self._limit_num
        })


class GetPBRecordsRequestParams(GetRecordsRequestParams):
    """
    Protobuf Request params of get records api
    """

    def content(self):
        pb_get_record_request = {
            'cursor': self._cursor,
            'limit': self._limit_num
        }
        pb_data = encode_data(GetRecordsRequest, pb_get_record_request)
        return pb_message_wrap(pb_data)

    @staticmethod
    def extra_headers(sub_id=None):
        header = {
            Headers.REQUEST_ACTION: GetRecordsRequestParams.action,
            Headers.CONTENT_TYPE: ContentType.HTTP_PROTOBUF.value
        }
        if sub_id is not None:
            header[Headers.CONTENT_SUB_ID] = sub_id
        return header


class GetBatchRecordsRequestParams(GetRecordsRequestParams):
    """
    Batch Request params of get records api
    """
    def content(self):
        batch_get_record_request = {
            'cursor': self._cursor,
            'limit': self._limit_num
        }
        batch_data = encode_data(GetRecordsRequest, batch_get_record_request)
        return pb_message_wrap(batch_data)

    @staticmethod
    def extra_headers(sub_id=None):
        header = {
            Headers.REQUEST_ACTION: GetRecordsRequestParams.action,
            Headers.CONTENT_TYPE: ContentType.HTTP_BATCH.value
        }
        if sub_id is not None:
            header[Headers.CONTENT_SUB_ID] = sub_id
        return header


class GetMeteringInfoRequestParams(RequestParams):
    """
    Request params of get metering info api
    """

    def content(self):
        return json.dumps({
            "Action": "meter"
        })


class CreateConnectorParams(RequestParams):
    """
    Request params of create data connector api
    """

    __slots__ = ('_column_fields', '_config', '_start_time')

    def __init__(self, column_fields, config, start_time):
        self._column_fields = column_fields
        if not self._column_fields:
            self._column_fields = []
        self._config = config
        self._start_time = start_time

    @property
    def column_fields(self):
        return self._column_fields

    @column_fields.setter
    def column_fields(self, value):
        self._column_fields = value

    @property
    def config(self):
        return self._config

    @config.setter
    def config(self, value):
        self._config = value

    @property
    def start_time(self):
        return self._start_time

    @start_time.setter
    def start_time(self, value):
        self._start_time = value

    def content(self):
        return json.dumps({
            "Action": "Create",
            "ColumnFields": self._column_fields,
            "Config": self._config.to_json(),
            "SinkStartTime": self._start_time
        })


class UpdateConnectorParams(RequestParams):
    """
    Request params of update data connector config api
    """

    __slots__ = '_config'

    def __init__(self, config):
        self._config = config

    @property
    def config(self):
        return self._config

    @config.setter
    def config(self, value):
        self._config = value

    def content(self):
        return json.dumps({
            "Action": "updateconfig",
            "Config": self._config.to_json()
        })


class GetConnectorShardStatusParams(RequestParams):
    """
    Request params of get data connector shard status api
    """

    __slots__ = '_shard_id'

    def __init__(self, shard_id):
        self._shard_id = shard_id

    @property
    def shard_id(self):
        return self._shard_id

    @shard_id.setter
    def shard_id(self, value):
        self._shard_id = value

    def content(self):
        data = {
            "Action": "Status"
        }
        if self._shard_id:
            data['ShardId'] = self._shard_id
        return json.dumps(data)


class ReloadConnectorParams(RequestParams):
    """
    Request params of get data connector shard status api
    """

    __slots__ = '_shard_id'

    def __init__(self, shard_id):
        self._shard_id = shard_id

    @property
    def shard_id(self):
        return self._shard_id

    @shard_id.setter
    def shard_id(self, value):
        self._shard_id = value

    def content(self):
        data = {
            "Action": "Reload"
        }
        if self._shard_id:
            data['ShardId'] = self._shard_id
        return json.dumps(data)


class AppendFieldParams(RequestParams):
    """
    Request params of append field api
    """

    __slots__ = ('_field_name', '_field_type')

    def __init__(self, field_name, field_type):
        self._field_name = field_name
        self._field_type = field_type

    @property
    def field_name(self):
        return self._field_name

    @field_name.setter
    def field_name(self, value):
        self._field_name = value

    @property
    def field_type(self):
        return self._field_type

    @field_type.setter
    def field_type(self, value):
        self._field_type = value

    def content(self):
        return json.dumps({
            "Action": "appendfield",
            "FieldName": self._field_name,
            "FieldType": self._field_type.value
        })


class AppendConnectorFieldParams(RequestParams):
    """
    Request params of append data connector field api
    """

    __slots__ = '_field_name'

    def __init__(self, field_name):
        self._field_name = field_name

    @property
    def field_name(self):
        return self._field_name

    @field_name.setter
    def field_name(self, value):
        self._field_name = value

    def content(self):
        return json.dumps({
            "Action": "appendfield",
            "FieldName": self._field_name
        })


class InitAndGetSubscriptionOffsetParams(RequestParams):
    """
    Request params of init and get subscription offset api
    """

    __slots__ = '_shard_ids'

    def __init__(self, shard_ids):
        self._shard_ids = shard_ids

    @property
    def shard_ids(self):
        return self._shard_ids

    @shard_ids.setter
    def shard_ids(self, value):
        self._shard_ids = value

    def content(self):
        return json.dumps({
            "Action": "open",
            "ShardIds": self._shard_ids
        })


class GetSubscriptionOffsetParams(RequestParams):
    """
    Request params of get subscription offset api
    """

    __slots__ = '_shard_ids'

    def __init__(self, shard_ids):
        self._shard_ids = shard_ids

    @property
    def shard_ids(self):
        return self._shard_ids

    @shard_ids.setter
    def shard_ids(self, value):
        self._shard_ids = value

    def content(self):
        data = {
            "Action": "get"
        }
        if self._shard_ids:
            data['ShardIds'] = self._shard_ids
        return json.dumps(data)


class UpdateSubscriptionOffsetParams(RequestParams):
    """
    Request params of update subscription offset api
    """

    __slots__ = '_offsets'

    def __init__(self, offsets):
        self._offsets = offsets

    @property
    def offsets(self):
        return self._offsets

    @offsets.setter
    def offsets(self, value):
        self._offsets = value

    def content(self):
        offsets = {}
        for (k, v) in self._offsets.items():
            offsets.update({
                k: v.to_json()
            })
        return json.dumps({
            "Action": "commit",
            "Offsets": offsets
        })


class UpdateConnectorStateParams(RequestParams):
    """
    Request params of update connector state
    """

    __slots__ = '_connector_state'

    def __init__(self, connector_state):
        self._connector_state = connector_state

    @property
    def connector_state(self):
        return self._connector_state

    @connector_state.setter
    def connector_state(self, value):
        self._connector_state = value

    def content(self):
        return json.dumps({
            'Action': 'UpdateState',
            'State': self._connector_state.value
        })


class UpdateConnectorOffsetParams(RequestParams):
    """
    Request params of update connector state
    """

    __slots__ = ('_shard_id', '_connector_offset')

    def __init__(self, shard_id, connector_offset):
        self._shard_id = shard_id
        self._connector_offset = connector_offset

    @property
    def shard_id(self):
        return self._shard_id

    @shard_id.setter
    def shard_id(self, value):
        self._shard_id = value

    @property
    def connector_offset(self):
        return self._connector_offset

    @connector_offset.setter
    def connector_offset(self, value):
        self._connector_offset = value

    def content(self):
        data = {
            'Action': 'UpdateShardContext'
        }
        if self._shard_id:
            data['ShardId'] = self._shard_id
        if self._connector_offset.sequence > -1:
            data["CurrentSequence"] = self._connector_offset.sequence
        if self._connector_offset.timestamp > -1:
            data["CurrentTime"] = self._connector_offset.timestamp

        return json.dumps(data)


class UpdateConnectorShardContextParams(RequestParams):
    """
    Request params of update connector shard context
    """

    __slots__ = ('_shard_id', '_start_sequence', '_end_sequence', '_current_sequence')

    def __init__(self, shard_id, start_sequence, end_sequence, current_sequence):
        self._shard_id = shard_id
        self._start_sequence = start_sequence
        self._end_sequence = end_sequence
        self._current_sequence = current_sequence

    @property
    def shard_id(self):
        return self._shard_id

    @shard_id.setter
    def shard_id(self, value):
        self._shard_id = value

    @property
    def start_sequence(self):
        return self._start_sequence

    @start_sequence.setter
    def start_sequence(self, value):
        self._start_sequence = value

    @property
    def end_sequence(self):
        return self._end_sequence

    @end_sequence.setter
    def end_sequence(self, value):
        self._end_sequence = value

    @property
    def current_sequence(self):
        return self._current_sequence

    @current_sequence.setter
    def current_sequence(self, value):
        self._current_sequence = value

    def content(self):
        data = {
            'Action': 'UpdateShardContext',
            'ShardId': self._shard_id,
            'CurrentSequence': self._current_sequence
        }
        if self._start_sequence >= 0:
            data['StartSequence'] = self._start_sequence
        if self._end_sequence >= 0:
            data['EndSequence'] = self._end_sequence
        return json.dumps(data)


class CreateSubscriptionParams(RequestParams):
    """
    Request params of create subscription
    """

    __slots__ = '_comment'

    def __init__(self, comment):
        self._comment = comment

    @property
    def comment(self):
        return self._comment

    @comment.setter
    def comment(self, value):
        self._comment = value

    def content(self):
        return json.dumps({
            'Action': 'create',
            'Comment': self._comment
        })


class UpdateSubscriptionParams(RequestParams):
    """
    Request params of update subscription
    """

    __slots__ = '_comment'

    def __init__(self, comment):
        self._comment = comment

    @property
    def comment(self):
        return self._comment

    @comment.setter
    def comment(self, value):
        self._comment = value

    def content(self):
        return json.dumps({
            'Comment': self._comment
        })


class UpdateSubscriptionStateParams(RequestParams):
    """
    Request params of update subscription state
    """

    __slots__ = '_state'

    def __init__(self, state):
        self._state = state

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value):
        self._state = value

    def content(self):
        return json.dumps({
            'State': self._state.value
        })


class ListSubscriptionParams(RequestParams):
    """
    Request params of query subscription
    """

    __slots__ = ('_query_key', '_page_index', '_page_size')

    def __init__(self, query_key, page_index, page_size):
        self._page_index = page_index
        self._page_size = page_size
        self._query_key = query_key

    @property
    def page_index(self):
        return self._page_index

    @page_index.setter
    def page_index(self, value):
        self._page_index = value

    @property
    def page_size(self):
        return self._page_size

    @page_size.setter
    def page_size(self, value):
        self._page_size = value

    @property
    def query_key(self):
        return self._query_key

    @query_key.setter
    def query_key(self, value):
        self._query_key = value

    def content(self):
        data = {
            'Action': 'list',
            'PageIndex': self._page_index,
            'PageSize': self._page_size
        }
        if self._query_key:
            data['Search'] = self._query_key
        return json.dumps(data)


class ResetSubscriptionOffsetParams(RequestParams):
    """
    Request params of reset subscription offset
    """

    __slots__ = '_offsets'

    def __init__(self, offsets):
        self._offsets = offsets

    @property
    def offsets(self):
        return self._offsets

    @offsets.setter
    def offsets(self, value):
        self._offsets = value

    def content(self):
        offsets = {}
        for (k, v) in self._offsets.items():
            offsets.update({
                k: v.to_json()
            })
        return json.dumps({
            "Action": "reset",
            "Offsets": offsets
        })


class JoinGroupParams(RequestParams):
    """
    Request params of join group
    """

    __slots__ = '_session_timeout'

    def __init__(self, session_timeout):
        self._session_timeout = session_timeout

    @property
    def session_timeout(self):
        return self._session_timeout

    @session_timeout.setter
    def session_timeout(self, session_timeout):
        self._session_timeout = session_timeout

    def content(self):
        return json.dumps({
            "Action": "joinGroup",
            "SessionTimeout": self._session_timeout
        })


class HeartBeatParams(RequestParams):
    """
    Request params of heart beat
    """

    __slots__ = '_consumer_id', '_version_id', '_hold_shard_list', '_read_end_shard_list'

    def __init__(self, consumer_id, version_id, hold_shard_list, read_end_shard_list):
        self._consumer_id = consumer_id
        self._version_id = version_id
        self._hold_shard_list = hold_shard_list
        self._read_end_shard_list = read_end_shard_list

    @property
    def consumer_id(self):
        return self._consumer_id

    @consumer_id.setter
    def consumer_id(self, consumer_id):
        self._consumer_id = consumer_id

    @property
    def version_id(self):
        return self._version_id

    @version_id.setter
    def version_id(self, version_id):
        self._version_id = version_id

    @property
    def hold_shard_list(self):
        return self._hold_shard_list

    @hold_shard_list.setter
    def hold_shard_list(self, hold_shard_list):
        self._hold_shard_list = hold_shard_list

    @property
    def read_end_shard_list(self):
        return self._read_end_shard_list

    @read_end_shard_list.setter
    def read_end_shard_list(self, read_end_shard_list):
        self._read_end_shard_list = read_end_shard_list

    def content(self):
        return json.dumps({
            "Action": "heartBeat",
            "ConsumerId": self._consumer_id,
            "VersionId": self._version_id,
            "HoldShardList": self._hold_shard_list,
            "ReadEndShardList": self._read_end_shard_list
        })


class SyncGroupParams(RequestParams):
    """
    Request params of sync group
    """

    __slots__ = '_consumer_id', '_version_id', '_release_shard_list', '_read_end_shard_list'

    def __init__(self, consumer_id, version_id, release_shard_list, read_end_shard_list):
        self._consumer_id = consumer_id
        self._version_id = version_id
        self._release_shard_list = release_shard_list
        self._read_end_shard_list = read_end_shard_list

    @property
    def consumer_id(self):
        return self._consumer_id

    @consumer_id.setter
    def consumer_id(self, consumer_id):
        self._consumer_id = consumer_id

    @property
    def version_id(self):
        return self._version_id

    @version_id.setter
    def version_id(self, version_id):
        self._version_id = version_id

    @property
    def release_shard_list(self):
        return self._release_shard_list

    @release_shard_list.setter
    def release_shard_list(self, release_shard_list):
        self._release_shard_list = release_shard_list

    @property
    def read_end_shard_list(self):
        return self._read_end_shard_list

    @read_end_shard_list.setter
    def read_end_shard_list(self, read_end_shard_list):
        self._read_end_shard_list = read_end_shard_list

    def content(self):
        return json.dumps({
            "Action": "syncGroup",
            "ConsumerId": self._consumer_id,
            "VersionId": self._version_id,
            "ReleaseShardList": self._release_shard_list,
            "ReadEndShardList": self._read_end_shard_list
        })


class LeaveGroupParams(RequestParams):
    """
    Request params of leave group
    """

    __slots__ = '_consumer_id', '_version_id'

    def __init__(self, consumer_id, version_id):
        self._consumer_id = consumer_id
        self._version_id = version_id

    @property
    def consumer_id(self):
        return self._consumer_id

    @consumer_id.setter
    def consumer_id(self, consumer_id):
        self._consumer_id = consumer_id

    @property
    def version_id(self):
        return self._version_id

    @version_id.setter
    def version_id(self, version_id):
        self._version_id = version_id

    def content(self):
        return json.dumps({
            "Action": "leaveGroup",
            "ConsumerId": self._consumer_id,
            "VersionId": self._version_id,
        })


class ListTopicSchemaParams(RequestParams):
    """
    Request params of list topic schema
    """
    __slots__ = '_page_number', '_page_size'

    def __init__(self, page_number, page_size):
        self._page_number = page_number
        self._page_size = page_size

    @property
    def page_number(self):
        return self._page_number

    @page_number.setter
    def page_number(self, page_number):
        self._page_number = page_number

    @property
    def page_size(self):
        return self._page_size

    @page_size.setter
    def page_size(self, page_size):
        self._page_size = page_size

    def content(self):
        return json.dumps({
            "Action": "ListSchema",
            "PageNumber": self._page_number,
            "PageSize": self._page_size
        })


class GetTopicSchemaParams(RequestParams):
    """
    Request params of get topic schema
    """
    __slots__ = '_version_id', '_record_schema'

    def __init__(self, version_id, record_schema):
        self._version_id = version_id
        self._record_schema = record_schema

    @property
    def version_id(self):
        return self._version_id

    @version_id.setter
    def version_id(self, version_id):
        self._version_id = version_id

    @property
    def record_schema(self):
        return self._record_schema

    @record_schema.setter
    def record_schema(self, record_schema):
        self._record_schema = record_schema

    def content(self):
        return json.dumps({
            "Action": "GetSchema",
            "VersionId": self._version_id,
            "RecordSchema": self._record_schema.to_json_string() if self._record_schema else ""
        })


class RegisterTopicSchemaParams(RequestParams):
    """
    Request params of register topic schema
    """
    __slots__ = '_record_schema'

    def __init__(self, record_schema):
        self._record_schema = record_schema

    @property
    def record_schema(self):
        return self._record_schema

    @record_schema.setter
    def record_schema(self, record_schema):
        self._record_schema = record_schema

    def content(self):
        return json.dumps({
            "Action": "RegisterSchema",
            "RecordSchema": self._record_schema.to_json_string() if self._record_schema else ""
        })


class DeleteTopicSchemaParams(RequestParams):
    """
    Request params of delete topic schema
    """
    __slots__ = '_version_id', '_record_schema'

    def __init__(self, version_id):
        self._version_id = version_id

    @property
    def version_id(self):
        return self._version_id

    @version_id.setter
    def version_id(self, version_id):
        self._version_id = version_id

    def content(self):
        return json.dumps({
            "Action": "DeleteSchema",
            "VersionId": self._version_id
        })
