datahub/requestmodel.go (531 lines of code) (raw):
package datahub
import (
"encoding/json"
"fmt"
"reflect"
"github.com/aliyun/aliyun-datahub-sdk-go/datahub/pbmodel"
"github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
"github.com/golang/protobuf/proto"
)
// handel the http request
type RequestModel interface {
// serialize the requestModel and maybe need add some message on http header
requestBodyEncode() ([]byte, error)
}
// empty request
type EmptyRequest struct {
}
func (br *EmptyRequest) requestBodyEncode() ([]byte, error) {
return nil, nil
}
type CreateProjectRequest struct {
Comment string `json:"Comment"`
}
func (cpr *CreateProjectRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(cpr)
}
type UpdateProjectRequest struct {
Comment string `json:"Comment"`
}
func (upr *UpdateProjectRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(upr)
}
type UpdateProjectVpcWhitelistRequest struct {
VpcIds string `json:"VpcIds"`
}
func (upv *UpdateProjectVpcWhitelistRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(upv)
}
type CreateTopicRequest struct {
Action string `json:"Action"`
ShardCount int `json:"ShardCount"`
Lifecycle int `json:"Lifecycle"`
RecordType RecordType `json:"RecordType"`
RecordSchema *RecordSchema `json:"RecordSchema,omitempty"`
Comment string `json:"Comment"`
ExpandMode ExpandMode `json:"ExpandMode"`
}
func (ctr *CreateTopicRequest) MarshalJSON() ([]byte, error) {
msg := &struct {
Action string `json:"Action"`
ShardCount int `json:"ShardCount"`
Lifecycle int `json:"Lifecycle"`
RecordType RecordType `json:"RecordType"`
RecordSchema string `json:"RecordSchema,omitempty"`
Comment string `json:"Comment"`
ExpandMode ExpandMode `json:"ExpandMode"`
}{
Action: ctr.Action,
ShardCount: ctr.ShardCount,
Lifecycle: ctr.Lifecycle,
RecordType: ctr.RecordType,
Comment: ctr.Comment,
ExpandMode: ctr.ExpandMode,
}
switch ctr.RecordType {
case TUPLE:
msg.RecordSchema = ctr.RecordSchema.String()
default:
msg.RecordSchema = ""
}
return json.Marshal(msg)
}
func (ctr *CreateTopicRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(ctr)
}
type UpdateTopicRequest struct {
Comment string `json:"Comment,omitempty"`
Lifecycle int `json:"Lifecycle,omitempty"`
}
func (utr *UpdateTopicRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(utr)
}
type SplitShardRequest struct {
Action string `json:"Action"`
ShardId string `json:"ShardId"`
SplitKey string `json:"SplitKey,omitempty"`
}
func (ssr *SplitShardRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(ssr)
}
type MergeShardRequest struct {
Action string `json:"Action"`
ShardId string `json:"ShardId"`
AdjacentShardId string `json:"AdjacentShardId"`
}
func (msr *MergeShardRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(msr)
}
type ExtendShardRequest struct {
Action string `json:"Action"`
ExtendMode string `json:"ExtendMode"`
ShardCount int `json:"ShardNumber"`
}
func (esr *ExtendShardRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(esr)
}
type GetCursorRequest struct {
Action string `json:"Action"`
CursorType CursorType `json:"Type"`
SystemTime int64 `json:"SystemTime"`
Sequence int64 `json:"Sequence"`
}
func (gcr *GetCursorRequest) requestBodyEncode() ([]byte, error) {
type ReqMsg struct {
Action string `json:"Action"`
Type CursorType `json:"Type"`
}
reqMsg := ReqMsg{
Action: gcr.Action,
Type: gcr.CursorType,
}
switch gcr.CursorType {
case OLDEST, LATEST:
return json.Marshal(reqMsg)
case SYSTEM_TIME:
return json.Marshal(struct {
ReqMsg
SystemTime int64 `json:"SystemTime"`
}{
ReqMsg: reqMsg,
SystemTime: gcr.SystemTime,
})
case SEQUENCE:
return json.Marshal(struct {
ReqMsg
Sequence int64 `json:"Sequence"`
}{
ReqMsg: reqMsg,
Sequence: gcr.Sequence,
})
default:
return nil, fmt.Errorf("cursor not support type %s", gcr.CursorType)
}
}
type PutRecordsRequest struct {
Action string `json:"Action"`
Records []IRecord `json:"Records"`
}
func (prr *PutRecordsRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(prr)
}
func (ptr *PutRecordsRequest) MarshalJSON() ([]byte, error) {
msg := &struct {
Action string `json:"Action"`
Records []RecordEntry `json:"Records"`
}{
Action: ptr.Action,
Records: make([]RecordEntry, len(ptr.Records)),
}
for idx, val := range ptr.Records {
msg.Records[idx].Data = val.GetData()
msg.Records[idx].BaseRecord = val.GetBaseRecord()
}
return json.Marshal(msg)
}
type GetRecordRequest struct {
Action string `json:"Action"`
Cursor string `json:"Cursor"`
Limit int `json:"Limit"`
}
func (grr *GetRecordRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(grr)
}
type AppendFieldRequest struct {
Action string `json:"Action"`
FieldName string `json:"FieldName"`
FieldType FieldType `json:"FieldType"`
}
func (afr *AppendFieldRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(afr)
}
type GetMeterInfoRequest struct {
Action string `json:"Action"`
}
func (gmir *GetMeterInfoRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(gmir)
}
type CreateConnectorRequest struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap"`
Config interface{} `json:"Config"`
}
func (ccr *CreateConnectorRequest) requestBodyEncode() ([]byte, error) {
switch ccr.Type {
case SinkOdps:
return marshalCreateOdpsConnector(ccr)
case SinkOss:
return marshalCreateOssConnector(ccr)
case SinkEs:
return marshalCreateEsConnector(ccr)
case SinkAds:
return marshalCreateAdsConnector(ccr)
case SinkMysql:
return marshalCreateMysqlConnector(ccr)
case SinkFc:
return marshalCreateFcConnector(ccr)
case SinkOts:
return marshalCreateOtsConnector(ccr)
case SinkDatahub:
return marshalCreateDatahubConnector(ccr)
case SinkHologres:
return marshalCreateHologresConnector(ccr)
default:
return nil, fmt.Errorf("not support connector type config: %s", ccr.Type.String())
}
}
type UpdateConnectorRequest struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap"`
Config interface{} `json:"Config"`
}
func (ucr *UpdateConnectorRequest) requestBodyEncode() ([]byte, error) {
if ucr.Config == nil {
return marshalUpdateConnector(ucr)
}
switch ucr.Config.(type) {
case SinkOdpsConfig:
return marshalUpdateOdpsConnector(ucr)
case SinkOssConfig:
return marshalUpdateOssConnector(ucr)
case SinkEsConfig:
return marshalUpdateEsConnector(ucr)
case SinkAdsConfig:
return marshalUpdateAdsConnector(ucr)
case SinkMysqlConfig:
return marshalUpdateMysqlConnector(ucr)
case SinkFcConfig:
return marshalUpdateFcConnector(ucr)
case SinkOtsConfig:
return marshalUpdateOtsConnector(ucr)
case SinkDatahubConfig:
return marshalUpdateDatahubConnector(ucr)
case SinkHologresConfig:
return marshalUpdateHologresConnector(ucr)
default:
return nil, fmt.Errorf("this connector type not support, %t", reflect.TypeOf(ucr.Config))
}
}
type ReloadConnectorRequest struct {
Action string `json:"Action"`
ShardId string `json:"ShardId,omitempty"`
}
func (rcr *ReloadConnectorRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(rcr)
}
type UpdateConnectorStateRequest struct {
Action string `json:"Action"`
State ConnectorState `json:"State"`
}
func (ucsr *UpdateConnectorStateRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(ucsr)
}
type UpdateConnectorOffsetRequest struct {
Action string `json:"Action"`
ShardId string `json:"ShardId"`
Timestamp int64 `json:"CurrentTime"`
Sequence int64 `json:"CurrentSequence"`
}
func (ucor *UpdateConnectorOffsetRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(ucor)
}
type GetConnectorShardStatusRequest struct {
Action string `json:"Action"`
ShardId string `json:"ShardId,omitempty"`
}
func (gcss *GetConnectorShardStatusRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(gcss)
}
type AppendConnectorFieldRequest struct {
Action string `json:"Action"`
FieldName string `json:"FieldName"`
}
func (acfr *AppendConnectorFieldRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(acfr)
}
type CreateSubscriptionRequest struct {
Action string `json:"Action"`
Comment string `json:"Comment"`
}
func (csr *CreateSubscriptionRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(csr)
}
type ListSubscriptionRequest struct {
Action string `json:"Action"`
PageIndex int `json:"PageIndex"`
PageSize int `json:"PageSize"`
}
func (lsr *ListSubscriptionRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(lsr)
}
type UpdateSubscriptionRequest struct {
//Action string `json:"Action"`
Comment string `json:"Comment"`
}
func (usr *UpdateSubscriptionRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(usr)
}
type UpdateSubscriptionStateRequest struct {
State SubscriptionState `json:"State"`
}
func (ussr *UpdateSubscriptionStateRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(ussr)
}
type OpenSubscriptionSessionRequest struct {
Action string `json:"Action"`
ShardIds []string `json:"ShardIds"`
}
func (ossr *OpenSubscriptionSessionRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(ossr)
}
type GetSubscriptionOffsetRequest struct {
Action string `json:"Action"`
ShardIds []string `json:"ShardIds"`
}
func (gsor *GetSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(gsor)
}
type CommitSubscriptionOffsetRequest struct {
Action string `json:"Action"`
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
func (csor *CommitSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(csor)
}
type ResetSubscriptionOffsetRequest struct {
Action string `json:"Action"`
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
func (rsor *ResetSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(rsor)
}
type HeartbeatRequest struct {
Action string `json:"Action"`
ConsumerId string `json:"ConsumerId"`
VersionId int64 `json:"VersionId"`
HoldShardList []string `json:"HoldShardList,omitempty"`
ReadEndShardList []string `json:"ReadEndShardList,omitempty"`
}
func (hr *HeartbeatRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(hr)
}
type JoinGroupRequest struct {
Action String `json:"Action"`
SessionTimeout int64 `json:"SessionTimeout"`
}
func (jgr *JoinGroupRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(jgr)
}
type SyncGroupRequest struct {
Action string `json:"Action"`
ConsumerId string `json:"ConsumerId"`
VersionId int64 `json:"VersionId"`
ReleaseShardList []string `json:"ReleaseShardList,omitempty"`
ReadEndShardList []string `json:"ReadEndShardList,omitempty"`
}
func (sgr *SyncGroupRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(sgr)
}
type LeaveGroupRequest struct {
Action string `json:"Action"`
ConsumerId string `json:"ConsumerId"`
VersionId int64 `json:"VersionId"`
}
func (lgr *LeaveGroupRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(lgr)
}
type ListTopicSchemaRequest struct {
Action string `json:"Action"`
}
func (lts *ListTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(lts)
}
type GetTopicSchemaRequest struct {
Action string `json:"Action"`
VersionId int `json:"VersionId"`
RecordSchema *RecordSchema `json:"RecordSchema,omitempty"`
}
func (gts *GetTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
msg := &struct {
Action string `json:"Action"`
VersionId int `json:"VersionId"`
RecordSchema string `json:"RecordSchema,omitempty"`
}{
Action: gts.Action,
VersionId: gts.VersionId,
}
if gts.RecordSchema != nil {
msg.RecordSchema = gts.RecordSchema.String()
}
return json.Marshal(msg)
}
type RegisterTopicSchemaRequest struct {
Action string `json:"Action"`
RecordSchema *RecordSchema `json:"RecordSchema"`
}
func (rts *RegisterTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
msg := &struct {
Action string `json:"Action"`
RecordSchema string `json:"RecordSchema,omitempty"`
}{
Action: rts.Action,
}
if rts.RecordSchema != nil {
msg.RecordSchema = rts.RecordSchema.String()
}
return json.Marshal(msg)
}
type DeleteTopicSchemaRequest struct {
Action string `json:"Action"`
VersionId int `json:"VersionId"`
}
func (lgr *DeleteTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
return json.Marshal(lgr)
}
type PutPBRecordsRequest struct {
Records []IRecord `json:"Records"`
}
func (pr *PutPBRecordsRequest) requestBodyEncode() ([]byte, error) {
res := make([]*pbmodel.RecordEntry, len(pr.Records))
for idx, val := range pr.Records {
bRecord := val.GetBaseRecord()
data := val.GetData()
fds := make([]*pbmodel.FieldData, 0)
switch val := data.(type) {
case []byte:
fd := &pbmodel.FieldData{
Value: val,
}
fds = append(fds, fd)
default:
v, ok := data.([]interface{})
if !ok {
return nil, fmt.Errorf("data format is invalid")
}
for _, str := range v {
fd := &pbmodel.FieldData{}
if str == nil {
fd.Value = nil
} else {
fd.Value = []byte(fmt.Sprintf("%s", str))
}
fds = append(fds, fd)
}
}
rd := &pbmodel.RecordData{
Data: fds,
}
recordEntry := &pbmodel.RecordEntry{
ShardId: proto.String(bRecord.ShardId),
Data: rd,
}
if len(bRecord.Attributes) > 0 {
sps := make([]*pbmodel.StringPair, len(bRecord.Attributes))
index := 0
for k, v := range bRecord.Attributes {
strv := fmt.Sprintf("%v", v)
sp := &pbmodel.StringPair{
Key: proto.String(k),
Value: proto.String(strv),
}
sps[index] = sp
index++
}
ra := &pbmodel.RecordAttributes{
Attributes: sps,
}
recordEntry.Attributes = ra
}
res[idx] = recordEntry
}
prr := &pbmodel.PutRecordsRequest{
Records: res,
}
buf, err := proto.Marshal(prr)
if err != nil {
return nil, err
}
x := util.WrapMessage(buf)
return x, nil
}
type GetPBRecordRequest struct {
Cursor string `json:"Cursor"`
Limit int `json:"Limit"`
}
func (gpr *GetPBRecordRequest) requestBodyEncode() ([]byte, error) {
limit := int32(gpr.Limit)
grr := &pbmodel.GetRecordsRequest{
Cursor: &gpr.Cursor,
Limit: &limit,
}
buf, err := proto.Marshal(grr)
if err != nil {
return nil, err
}
wBuf := util.WrapMessage(buf)
return wBuf, nil
}
type PutBatchRecordsRequest struct {
serializer *batchSerializer
Records []IRecord
}
func (pbr *PutBatchRecordsRequest) requestBodyEncode() ([]byte, error) {
batchBuf, err := pbr.serializer.serialize(pbr.Records)
if err != nil {
return nil, err
}
entry := &pbmodel.BinaryRecordEntry{
Data: batchBuf,
}
protoReq := &pbmodel.PutBinaryRecordsRequest{
Records: []*pbmodel.BinaryRecordEntry{entry},
}
buf, err := proto.Marshal(protoReq)
if err != nil {
return nil, err
}
return util.WrapMessage(buf), nil
}
type GetBatchRecordRequest struct {
GetPBRecordRequest
}