datahub/resultmodel.go (1,024 lines of code) (raw):
package datahub
import (
"encoding/json"
"fmt"
"net/http"
"github.com/golang/protobuf/proto"
"github.com/aliyun/aliyun-datahub-sdk-go/datahub/pbmodel"
"github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
)
// for the common response and detect error
type CommonResponseResult struct {
// StatusCode http return code
StatusCode int
// Request body raw size
RawSize int
// Request body size after compress
ReqSize int
// RequestId examples request id return by server
RequestId string
}
func newCommonResponseResult(code int, header *http.Header, body []byte) (*CommonResponseResult, error) {
result := &CommonResponseResult{
StatusCode: code,
RequestId: header.Get(httpHeaderRequestId),
}
var err error
switch {
case code >= 400:
var datahubErr DatahubClientError
if err = json.Unmarshal(body, &datahubErr); err != nil {
return nil, err
}
err = errorHandler(code, result.RequestId, datahubErr.Code, datahubErr.Message)
default:
err = nil
}
return result, err
}
// the result of ListProject
type ListProjectResult struct {
CommonResponseResult
ProjectNames []string `json:"ProjectNames"`
}
// convert the response body to ListProjectResult
func NewListProjectResult(data []byte, commonResp *CommonResponseResult) (*ListProjectResult, error) {
lpr := &ListProjectResult{
CommonResponseResult: *commonResp,
ProjectNames: make([]string, 0),
}
if err := json.Unmarshal(data, lpr); err != nil {
return nil, err
}
return lpr, nil
}
type CreateProjectResult struct {
CommonResponseResult
}
func NewCreateProjectResult(commonResp *CommonResponseResult) (*CreateProjectResult, error) {
cpr := &CreateProjectResult{
CommonResponseResult: *commonResp,
}
return cpr, nil
}
type UpdateProjectResult struct {
CommonResponseResult
}
func NewUpdateProjectResult(commonResp *CommonResponseResult) (*UpdateProjectResult, error) {
upr := &UpdateProjectResult{
CommonResponseResult: *commonResp,
}
return upr, nil
}
type DeleteProjectResult struct {
CommonResponseResult
}
func NewDeleteProjectResult(commonResp *CommonResponseResult) (*DeleteProjectResult, error) {
dpr := &DeleteProjectResult{
CommonResponseResult: *commonResp,
}
return dpr, nil
}
// the result of GetProject
type GetProjectResult struct {
CommonResponseResult
ProjectName string
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
Comment string `json:"Comment"`
}
// convert the response body to GetProjectResult
func NewGetProjectResult(data []byte, commonResp *CommonResponseResult) (*GetProjectResult, error) {
gpr := &GetProjectResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gpr); err != nil {
return nil, err
}
return gpr, nil
}
type UpdateProjectVpcWhitelistResult struct {
CommonResponseResult
}
func NewUpdateProjectVpcWhitelistResult(commonResp *CommonResponseResult) (*UpdateProjectVpcWhitelistResult, error) {
upvw := &UpdateProjectVpcWhitelistResult{
CommonResponseResult: *commonResp,
}
return upvw, nil
}
type ListTopicResult struct {
CommonResponseResult
TopicNames []string `json:"TopicNames"`
}
func NewListTopicResult(data []byte, commonResp *CommonResponseResult) (*ListTopicResult, error) {
lt := &ListTopicResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, lt); err != nil {
return nil, err
}
return lt, nil
}
type CreateBlobTopicResult struct {
CommonResponseResult
}
func NewCreateBlobTopicResult(commonResp *CommonResponseResult) (*CreateBlobTopicResult, error) {
cbrt := &CreateBlobTopicResult{
CommonResponseResult: *commonResp,
}
return cbrt, nil
}
type CreateTupleTopicResult struct {
CommonResponseResult
}
func NewCreateTupleTopicResult(commonResp *CommonResponseResult) (*CreateTupleTopicResult, error) {
cttr := &CreateTupleTopicResult{
CommonResponseResult: *commonResp,
}
return cttr, nil
}
type CreateTopicWithParaResult struct {
CommonResponseResult
}
func NewCreateTopicWithParaResult(commonResp *CommonResponseResult) (*CreateTopicWithParaResult, error) {
ctwp := &CreateTopicWithParaResult{
CommonResponseResult: *commonResp,
}
return ctwp, nil
}
type UpdateTopicResult struct {
CommonResponseResult
}
func NewUpdateTopicResult(commonResp *CommonResponseResult) (*UpdateTopicResult, error) {
utr := &UpdateTopicResult{
CommonResponseResult: *commonResp,
}
return utr, nil
}
type DeleteTopicResult struct {
CommonResponseResult
}
func NewDeleteTopicResult(commonResp *CommonResponseResult) (*DeleteTopicResult, error) {
dtr := &DeleteTopicResult{
CommonResponseResult: *commonResp,
}
return dtr, nil
}
type GetTopicResult struct {
CommonResponseResult
ProjectName string
TopicName string
ShardCount int `json:"ShardCount"`
LifeCycle int `json:"LifeCycle"`
RecordType RecordType `json:"RecordType"`
RecordSchema *RecordSchema `json:"RecordSchema"`
Comment string `json:"Comment"`
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
TopicStatus TopicStatus `json:"Status"`
ExpandMode ExpandMode `json:"ExpandMode"`
}
// for deserialize the RecordSchema
func (gtr *GetTopicResult) UnmarshalJSON(data []byte) error {
msg := &struct {
ShardCount int `json:"ShardCount"`
LifeCycle int `json:"LifeCycle"`
RecordType RecordType `json:"RecordType"`
RecordSchema string `json:"RecordSchema"`
Comment string `json:"Comment"`
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
TopicStatus TopicStatus `json:"Status"`
ExpandMode ExpandMode `json:"ExpandMode"`
}{}
if err := json.Unmarshal(data, msg); err != nil {
return err
}
gtr.ShardCount = msg.ShardCount
gtr.LifeCycle = msg.LifeCycle
gtr.RecordType = msg.RecordType
gtr.Comment = msg.Comment
gtr.CreateTime = msg.CreateTime
gtr.LastModifyTime = msg.LastModifyTime
gtr.TopicStatus = msg.TopicStatus
gtr.ExpandMode = msg.ExpandMode
if msg.RecordType == TUPLE {
rs := &RecordSchema{}
if err := json.Unmarshal([]byte(msg.RecordSchema), rs); err != nil {
return err
}
for idx := range rs.Fields {
rs.Fields[idx].AllowNull = !rs.Fields[idx].AllowNull
}
gtr.RecordSchema = rs
}
return nil
}
func NewGetTopicResult(data []byte, commonResp *CommonResponseResult) (*GetTopicResult, error) {
gr := &GetTopicResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gr); err != nil {
return nil, err
}
return gr, nil
}
type ListShardResult struct {
CommonResponseResult
Shards []ShardEntry `json:"Shards"`
Protocol string `json:"Protocol"`
IntervalMs int64 `json:"Interval"`
}
func NewListShardResult(data []byte, commonResp *CommonResponseResult) (*ListShardResult, error) {
lsr := &ListShardResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, lsr); err != nil {
return nil, err
}
return lsr, nil
}
type SplitShardResult struct {
CommonResponseResult
NewShards []ShardEntry `json:"NewShards"`
}
func NewSplitShardResult(data []byte, commonResp *CommonResponseResult) (*SplitShardResult, error) {
ssr := &SplitShardResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ssr); err != nil {
return nil, err
}
return ssr, nil
}
type MergeShardResult struct {
CommonResponseResult
ShardId string `json:"ShardId"`
BeginHashKey string `json:"BeginHashKey"`
EndHashKey string `json:"EndHashKey"`
}
func NewMergeShardResult(data []byte, commonResp *CommonResponseResult) (*MergeShardResult, error) {
ssr := &MergeShardResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ssr); err != nil {
return nil, err
}
return ssr, nil
}
type ExtendShardResult struct {
CommonResponseResult
}
func NewExtendShardResult(commonResp *CommonResponseResult) (*ExtendShardResult, error) {
esr := &ExtendShardResult{
CommonResponseResult: *commonResp,
}
return esr, nil
}
type GetCursorResult struct {
CommonResponseResult
Cursor string `json:"Cursor"`
RecordTime int64 `json:"RecordTime"`
Sequence int64 `json:"Sequence"`
}
func NewGetCursorResult(data []byte, commonResp *CommonResponseResult) (*GetCursorResult, error) {
gcr := &GetCursorResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gcr); err != nil {
return nil, err
}
return gcr, nil
}
type PutRecordsResult struct {
CommonResponseResult
FailedRecordCount int `json:"FailedRecordCount"`
FailedRecords []FailedRecord `json:"FailedRecords"`
}
func NewPutRecordsResult(data []byte, commonResp *CommonResponseResult) (*PutRecordsResult, error) {
prr := &PutRecordsResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, prr); err != nil {
return nil, err
}
return prr, nil
}
func NewPutPBRecordsResult(data []byte, commonResp *CommonResponseResult) (*PutRecordsResult, error) {
pr := &PutRecordsResult{
CommonResponseResult: *commonResp,
}
data, err := util.UnwrapMessage(data)
if err != nil {
return nil, err
}
prr := &pbmodel.PutRecordsResponse{}
if err := proto.Unmarshal(data, prr); err != nil {
return nil, err
}
pr.FailedRecordCount = int(*prr.FailedCount)
if pr.FailedRecordCount > 0 {
records := make([]FailedRecord, pr.FailedRecordCount)
for idx, v := range prr.FailedRecords {
records[idx].ErrorCode = *v.ErrorCode
records[idx].ErrorMessage = *v.ErrorMessage
records[idx].Index = int(*v.Index)
}
pr.FailedRecords = records
}
return pr, nil
}
type PutRecordsByShardResult struct {
CommonResponseResult
}
func NewPutRecordsByShardResult(commonResp *CommonResponseResult) (*PutRecordsByShardResult, error) {
prbs := &PutRecordsByShardResult{
CommonResponseResult: *commonResp,
}
return prbs, nil
}
type GetRecordsResult struct {
CommonResponseResult
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
LatestSequence int64 `json:"LatestSeq"`
LatestTime int64 `json:"LatestTime"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}
func (grr *GetRecordsResult) UnmarshalJSON(data []byte) error {
msg := &struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
LatestSequence int64 `json:"LatestSeq"`
LatestTime int64 `json:"LatestTime"`
Records []*struct {
SystemTime int64 `json:"SystemTime"`
NextCursor string `json:"NextCursor"`
CurrentCursor string `json:"Cursor"`
Sequence int64 `json:"Sequence"`
Attributes map[string]interface{} `json:"Attributes"`
Data interface{} `json:"Data"`
} `json:"Records"`
}{}
err := json.Unmarshal(data, msg)
if err != nil {
return err
}
grr.NextCursor = msg.NextCursor
grr.RecordCount = msg.RecordCount
grr.StartSequence = msg.StartSequence
grr.LatestSequence = msg.LatestSequence
grr.LatestTime = msg.LatestTime
grr.Records = make([]IRecord, len(msg.Records))
for idx, record := range msg.Records {
if record.Data == nil {
return fmt.Errorf("invalid record response, record data is nil")
}
switch dt := record.Data.(type) {
case []interface{}, []string:
if grr.RecordSchema == nil {
return fmt.Errorf("tuple record type must set record schema")
}
grr.Records[idx] = NewTupleRecord(grr.RecordSchema, record.SystemTime)
case string:
grr.Records[idx] = NewBlobRecord([]byte(dt), record.SystemTime)
default:
return fmt.Errorf("illegal record data type[%T]", dt)
}
if err := grr.Records[idx].FillData(record.Data); err != nil {
return err
}
for key, val := range record.Attributes {
grr.Records[idx].SetAttribute(key, val)
}
br := BaseRecord{
SystemTime: msg.Records[idx].SystemTime,
NextCursor: msg.Records[idx].NextCursor,
Cursor: msg.Records[idx].CurrentCursor,
Sequence: msg.Records[idx].Sequence,
Attributes: msg.Records[idx].Attributes,
}
grr.Records[idx].SetBaseRecord(br)
}
return nil
}
func NewGetRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult) (*GetRecordsResult, error) {
grr := &GetRecordsResult{
CommonResponseResult: *commonResp,
RecordSchema: schema,
}
if err := json.Unmarshal(data, grr); err != nil {
return nil, err
}
return grr, nil
}
func NewGetPBRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult) (*GetRecordsResult, error) {
data, err := util.UnwrapMessage(data)
if err != nil {
return nil, err
}
grr := &pbmodel.GetRecordsResponse{}
if err := proto.Unmarshal(data, grr); err != nil {
return nil, err
}
result := &GetRecordsResult{
CommonResponseResult: *commonResp,
RecordSchema: schema,
}
if grr.NextCursor != nil {
result.NextCursor = *(grr.NextCursor)
}
if grr.StartSequence != nil {
result.StartSequence = *grr.StartSequence
}
if grr.LatestSequence != nil {
result.LatestSequence = *grr.LatestSequence
}
if grr.LatestTime != nil {
result.LatestTime = *grr.LatestTime
}
if grr.RecordCount != nil {
result.RecordCount = int(*grr.RecordCount)
if result.RecordCount > 0 {
result.Records = make([]IRecord, result.RecordCount)
for idx, record := range grr.Records {
//Tuple topic
if result.RecordSchema != nil {
tr := NewTupleRecord(result.RecordSchema, *record.SystemTime)
if err := fillTupleData(tr, record); err != nil {
return nil, err
}
result.Records[idx] = tr
} else {
br := NewBlobRecord(record.Data.Data[0].Value, *record.SystemTime)
if err := fillBlobData(br, record); err != nil {
return nil, err
}
result.Records[idx] = br
}
}
}
}
return result, nil
}
func fillTupleData(tr *TupleRecord, recordEntry *pbmodel.RecordEntry) error {
if recordEntry.ShardId != nil {
tr.ShardId = *recordEntry.ShardId
}
if recordEntry.HashKey != nil {
tr.HashKey = *recordEntry.HashKey
}
if recordEntry.PartitionKey != nil {
tr.Sequence = *recordEntry.Sequence
}
if recordEntry.Cursor != nil {
tr.Cursor = *recordEntry.Cursor
}
if recordEntry.NextCursor != nil {
tr.NextCursor = *recordEntry.NextCursor
}
if recordEntry.Sequence != nil {
tr.Sequence = *recordEntry.Sequence
}
if recordEntry.SystemTime != nil {
tr.SystemTime = *recordEntry.SystemTime
}
if recordEntry.Attributes != nil {
for _, pair := range recordEntry.Attributes.Attributes {
tr.Attributes[*pair.Key] = *pair.Value
}
}
data := recordEntry.Data.Data
for idx, v := range data {
if v.Value != nil {
tv, err := castValueFromString(string(v.Value), tr.RecordSchema.Fields[idx].Type)
if err != nil {
return err
}
tr.Values[idx] = tv
}
}
return nil
}
func fillBlobData(br *BlobRecord, recordEntry *pbmodel.RecordEntry) error {
if recordEntry.ShardId != nil {
br.ShardId = *recordEntry.ShardId
}
if recordEntry.HashKey != nil {
br.HashKey = *recordEntry.HashKey
}
if recordEntry.PartitionKey != nil {
br.Sequence = *recordEntry.Sequence
}
if recordEntry.Cursor != nil {
br.Cursor = *recordEntry.Cursor
}
if recordEntry.NextCursor != nil {
br.NextCursor = *recordEntry.NextCursor
}
if recordEntry.Sequence != nil {
br.Sequence = *recordEntry.Sequence
}
if recordEntry.SystemTime != nil {
br.SystemTime = *recordEntry.SystemTime
}
if recordEntry.Attributes != nil {
for _, pair := range recordEntry.Attributes.Attributes {
br.Attributes[*pair.Key] = *pair.Value
}
}
br.RawData = recordEntry.Data.Data[0].Value
return nil
}
func NewGetBatchRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult, deserializer *batchDeserializer) (*GetRecordsResult, error) {
data, err := util.UnwrapMessage(data)
if err != nil {
return nil, err
}
gbr := &pbmodel.GetBinaryRecordsResponse{}
if err := proto.Unmarshal(data, gbr); err != nil {
return nil, err
}
result := &GetRecordsResult{
CommonResponseResult: *commonResp,
RecordSchema: schema,
}
if gbr.NextCursor != nil {
result.NextCursor = *(gbr.NextCursor)
}
if gbr.StartSequence != nil {
result.StartSequence = *gbr.StartSequence
}
if gbr.LatestSequence != nil {
result.LatestSequence = *gbr.LatestSequence
}
if gbr.LatestTime != nil {
result.LatestTime = *gbr.LatestTime
}
// 这里的RecordCount不是record数量,而是batch的数量
if gbr.RecordCount != nil {
if *gbr.RecordCount > 0 {
result.Records = make([]IRecord, 0, *gbr.RecordCount)
for _, record := range gbr.Records {
meta := &respMeta{
cursor: record.GetCursor(),
nextCursor: record.GetNextCursor(),
sequence: record.GetSequence(),
systemTime: record.GetSystemTime(),
serial: int64(record.GetSerial()),
}
recordList, err := deserializer.deserialize(record.Data, meta)
if err != nil {
return nil, err
}
result.Records = append(result.Records, recordList...)
}
}
}
result.RecordCount = len(result.Records)
return result, nil
}
type AppendFieldResult struct {
CommonResponseResult
}
func NewAppendFieldResult(commonResp *CommonResponseResult) (*AppendFieldResult, error) {
afr := &AppendFieldResult{
CommonResponseResult: *commonResp,
}
return afr, nil
}
type GetMeterInfoResult struct {
CommonResponseResult
ActiveTime int64 `json:"ActiveTime"`
Storage int64 `json:"Storage"`
}
func NewGetMeterInfoResult(data []byte, commonResp *CommonResponseResult) (*GetMeterInfoResult, error) {
gmir := &GetMeterInfoResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gmir); err != nil {
return nil, err
}
return gmir, nil
}
type ListConnectorResult struct {
CommonResponseResult
ConnectorIds []string `json:"Connectors"`
}
func NewListConnectorResult(data []byte, commonResp *CommonResponseResult) (*ListConnectorResult, error) {
lcr := &ListConnectorResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, lcr); err != nil {
return nil, err
}
return lcr, nil
}
type CreateConnectorResult struct {
CommonResponseResult
ConnectorId string `json:"ConnectorId"`
}
func NewCreateConnectorResult(data []byte, commonResp *CommonResponseResult) (*CreateConnectorResult, error) {
ccr := &CreateConnectorResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ccr); err != nil {
return nil, err
}
return ccr, nil
}
type UpdateConnectorResult struct {
CommonResponseResult
}
func NewUpdateConnectorResult(commonResp *CommonResponseResult) (*UpdateConnectorResult, error) {
ucr := &UpdateConnectorResult{
CommonResponseResult: *commonResp,
}
return ucr, nil
}
type DeleteConnectorResult struct {
CommonResponseResult
}
func NewDeleteConnectorResult(commonResp *CommonResponseResult) (*DeleteConnectorResult, error) {
dcr := &DeleteConnectorResult{
CommonResponseResult: *commonResp,
}
return dcr, nil
}
type GetConnectorResult struct {
CommonResponseResult
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
ConnectorId string `json:"ConnectorId"`
ClusterAddress string `json:"ClusterAddress"`
Type ConnectorType `json:"Type"`
State ConnectorState `json:"State"`
ColumnFields []string `json:"ColumnFields"`
ExtraConfig map[string]string `json:"ExtraInfo"`
Creator string `json:"Creator"`
Owner string `json:"Owner"`
Config interface{} `json:"Config"`
}
func NewGetConnectorResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorResult, error) {
cType := &struct {
Type ConnectorType `json:"Type"`
}{}
if err := json.Unmarshal(data, cType); err != nil {
return nil, err
}
switch cType.Type {
case SinkOdps:
return unmarshalGetOdpsConnector(commonResp, data)
case SinkOss:
return unmarshalGetOssConnector(commonResp, data)
case SinkEs:
return unmarshalGetEsConnector(commonResp, data)
case SinkAds:
return unmarshalGetAdsConnector(commonResp, data)
case SinkMysql:
return unmarshalGetMysqlConnector(commonResp, data)
case SinkFc:
return unmarshalGetFcConnector(commonResp, data)
case SinkOts:
return unmarshalGetOtsConnector(commonResp, data)
case SinkDatahub:
return unmarshalGetDatahubConnector(commonResp, data)
case SinkHologres:
return unmarshalGetHologresConnector(commonResp, data)
default:
return nil, fmt.Errorf("not support connector type %s", cType.Type.String())
}
}
type GetConnectorDoneTimeResult struct {
CommonResponseResult
DoneTime int64 `json:"DoneTime"`
TimeZone string `json:"TimeZone"`
TimeWindow int `json:"TimeWindow"`
}
func NewGetConnectorDoneTimeResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorDoneTimeResult, error) {
gcdt := &GetConnectorDoneTimeResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gcdt); err != nil {
return nil, err
}
return gcdt, nil
}
type GetConnectorShardStatusResult struct {
CommonResponseResult
ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
func NewGetConnectorShardStatusResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorShardStatusResult, error) {
gcss := &GetConnectorShardStatusResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gcss); err != nil {
return nil, err
}
return gcss, nil
}
type GetConnectorShardStatusByShardResult struct {
CommonResponseResult
ConnectorShardStatusEntry
}
func NewGetConnectorShardStatusByShardResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorShardStatusByShardResult, error) {
csse := &ConnectorShardStatusEntry{}
if err := json.Unmarshal(data, csse); err != nil {
return nil, err
}
gcss := &GetConnectorShardStatusByShardResult{
CommonResponseResult: *commonResp,
ConnectorShardStatusEntry: *csse,
}
return gcss, nil
}
type ReloadConnectorResult struct {
CommonResponseResult
}
func NewReloadConnectorResult(commonResp *CommonResponseResult) (*ReloadConnectorResult, error) {
rcr := &ReloadConnectorResult{
CommonResponseResult: *commonResp,
}
return rcr, nil
}
type ReloadConnectorByShardResult struct {
CommonResponseResult
}
func NewReloadConnectorByShardResult(commonResp *CommonResponseResult) (*ReloadConnectorByShardResult, error) {
rcsr := &ReloadConnectorByShardResult{
CommonResponseResult: *commonResp,
}
return rcsr, nil
}
type UpdateConnectorStateResult struct {
CommonResponseResult
}
func NewUpdateConnectorStateResult(commonResp *CommonResponseResult) (*UpdateConnectorStateResult, error) {
ucsr := &UpdateConnectorStateResult{
CommonResponseResult: *commonResp,
}
return ucsr, nil
}
type UpdateConnectorOffsetResult struct {
CommonResponseResult
}
func NewUpdateConnectorOffsetResult(commonResp *CommonResponseResult) (*UpdateConnectorOffsetResult, error) {
ucor := &UpdateConnectorOffsetResult{
CommonResponseResult: *commonResp,
}
return ucor, nil
}
type AppendConnectorFieldResult struct {
CommonResponseResult
}
func NewAppendConnectorFieldResult(commonResp *CommonResponseResult) (*AppendConnectorFieldResult, error) {
acfr := &AppendConnectorFieldResult{
CommonResponseResult: *commonResp,
}
return acfr, nil
}
type ListSubscriptionResult struct {
CommonResponseResult
TotalCount int64 `json:"TotalCount"`
Subscriptions []SubscriptionEntry `json:"Subscriptions"`
}
func NewListSubscriptionResult(data []byte, commonResp *CommonResponseResult) (*ListSubscriptionResult, error) {
lsr := &ListSubscriptionResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, lsr); err != nil {
return nil, err
}
return lsr, nil
}
type CreateSubscriptionResult struct {
CommonResponseResult
SubId string `json:"SubId"`
}
func NewCreateSubscriptionResult(data []byte, commonResp *CommonResponseResult) (*CreateSubscriptionResult, error) {
csr := &CreateSubscriptionResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, csr); err != nil {
return nil, err
}
return csr, nil
}
type UpdateSubscriptionResult struct {
CommonResponseResult
}
func NewUpdateSubscriptionResult(commonResp *CommonResponseResult) (*UpdateSubscriptionResult, error) {
usr := &UpdateSubscriptionResult{
CommonResponseResult: *commonResp,
}
return usr, nil
}
type DeleteSubscriptionResult struct {
CommonResponseResult
}
func NewDeleteSubscriptionResult(commonResp *CommonResponseResult) (*DeleteSubscriptionResult, error) {
dsr := &DeleteSubscriptionResult{
CommonResponseResult: *commonResp,
}
return dsr, nil
}
type GetSubscriptionResult struct {
CommonResponseResult
SubscriptionEntry
}
func NewGetSubscriptionResult(data []byte, commonResp *CommonResponseResult) (*GetSubscriptionResult, error) {
gsr := &GetSubscriptionResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gsr); err != nil {
return nil, err
}
return gsr, nil
}
type UpdateSubscriptionStateResult struct {
CommonResponseResult
}
func NewUpdateSubscriptionStateResult(commonResp *CommonResponseResult) (*UpdateSubscriptionStateResult, error) {
ussr := &UpdateSubscriptionStateResult{
CommonResponseResult: *commonResp,
}
return ussr, nil
}
type OpenSubscriptionSessionResult struct {
CommonResponseResult
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
func NewOpenSubscriptionSessionResult(data []byte, commonResp *CommonResponseResult) (*OpenSubscriptionSessionResult, error) {
ossr := &OpenSubscriptionSessionResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ossr); err != nil {
return nil, err
}
return ossr, nil
}
type GetSubscriptionOffsetResult struct {
CommonResponseResult
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
func NewGetSubscriptionOffsetResult(data []byte, commonResp *CommonResponseResult) (*GetSubscriptionOffsetResult, error) {
gsor := &GetSubscriptionOffsetResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, gsor); err != nil {
return nil, err
}
return gsor, nil
}
type CommitSubscriptionOffsetResult struct {
CommonResponseResult
}
func NewCommitSubscriptionOffsetResult(commonResp *CommonResponseResult) (*CommitSubscriptionOffsetResult, error) {
csor := &CommitSubscriptionOffsetResult{
CommonResponseResult: *commonResp,
}
return csor, nil
}
type ResetSubscriptionOffsetResult struct {
CommonResponseResult
}
func NewResetSubscriptionOffsetResult(commonResp *CommonResponseResult) (*ResetSubscriptionOffsetResult, error) {
rsor := &ResetSubscriptionOffsetResult{
CommonResponseResult: *commonResp,
}
return rsor, nil
}
type HeartbeatResult struct {
CommonResponseResult
PlanVersion int64 `json:"PlanVersion"`
ShardList []string `json:"ShardList"`
TotalPlan string `json:"TotalPlan"`
}
func NewHeartbeatResult(data []byte, commonResp *CommonResponseResult) (*HeartbeatResult, error) {
hr := &HeartbeatResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, hr); err != nil {
return nil, err
}
return hr, nil
}
type JoinGroupResult struct {
CommonResponseResult
ConsumerId string `json:"ConsumerId"`
VersionId int64 `json:"VersionId"`
SessionTimeout int64 `json:"SessionTimeout"`
}
func NewJoinGroupResult(data []byte, commonResp *CommonResponseResult) (*JoinGroupResult, error) {
jgr := &JoinGroupResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, jgr); err != nil {
return nil, err
}
return jgr, nil
}
type SyncGroupResult struct {
CommonResponseResult
}
func NewSyncGroupResult(commonResp *CommonResponseResult) (*SyncGroupResult, error) {
sgr := &SyncGroupResult{
CommonResponseResult: *commonResp,
}
return sgr, nil
}
type LeaveGroupResult struct {
CommonResponseResult
}
func NewLeaveGroupResult(commonResp *CommonResponseResult) (*LeaveGroupResult, error) {
lgr := &LeaveGroupResult{
CommonResponseResult: *commonResp,
}
return lgr, nil
}
type ListTopicSchemaResult struct {
CommonResponseResult
SchemaInfoList []RecordSchemaInfo `json:"RecordSchemaList"`
}
// for deserialize the RecordSchema
func (gtr *ListTopicSchemaResult) UnmarshalJSON(data []byte) error {
type RecordSchemaInfoHelper struct {
VersionId int `json:"VersionId"`
RecordSchema string `json:"RecordSchema"`
}
msg := &struct {
SchemaInfoList []RecordSchemaInfoHelper `json:"RecordSchemaList"`
}{}
if err := json.Unmarshal(data, msg); err != nil {
return err
}
for _, info := range msg.SchemaInfoList {
schema := &RecordSchema{}
if err := json.Unmarshal([]byte(info.RecordSchema), schema); err != nil {
return err
}
for idx := range schema.Fields {
schema.Fields[idx].AllowNull = !schema.Fields[idx].AllowNull
}
schemaInfo := RecordSchemaInfo{
VersionId: info.VersionId,
RecordSchema: *schema,
}
gtr.SchemaInfoList = append(gtr.SchemaInfoList, schemaInfo)
}
return nil
}
func NewListTopicSchemaResult(data []byte, commonResp *CommonResponseResult) (*ListTopicSchemaResult, error) {
ret := &ListTopicSchemaResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ret); err != nil {
return nil, err
}
return ret, nil
}
type GetTopicSchemaResult struct {
CommonResponseResult
VersionId int `json:"VersionId"`
RecordSchema RecordSchema `json:"RecordSchema"`
}
func (gtr *GetTopicSchemaResult) UnmarshalJSON(data []byte) error {
msg := &struct {
VersionId int `json:"VersionId"`
RecordSchema string `json:"RecordSchema"`
}{}
if err := json.Unmarshal(data, msg); err != nil {
return err
}
schema := &RecordSchema{}
if err := json.Unmarshal([]byte(msg.RecordSchema), schema); err != nil {
return err
}
for idx := range schema.Fields {
schema.Fields[idx].AllowNull = !schema.Fields[idx].AllowNull
}
gtr.VersionId = msg.VersionId
gtr.RecordSchema = *schema
return nil
}
func NewGetTopicSchemaResult(data []byte, commonResp *CommonResponseResult) (*GetTopicSchemaResult, error) {
ret := &GetTopicSchemaResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ret); err != nil {
return nil, err
}
return ret, nil
}
type RegisterTopicSchemaResult struct {
CommonResponseResult
VersionId int `json:"VersionId"`
}
func NewRegisterTopicSchemaResult(data []byte, commonResp *CommonResponseResult) (*RegisterTopicSchemaResult, error) {
ret := &RegisterTopicSchemaResult{
CommonResponseResult: *commonResp,
}
if err := json.Unmarshal(data, ret); err != nil {
return nil, err
}
return ret, nil
}
type DeleteTopicSchemaResult struct {
CommonResponseResult
}
func NewDeleteTopicSchemaResult(commonResp *CommonResponseResult) (*DeleteTopicSchemaResult, error) {
ret := &DeleteTopicSchemaResult{
CommonResponseResult: *commonResp,
}
return ret, nil
}