datahub/implement.go (1,546 lines of code) (raw):
package datahub
import (
"fmt"
"time"
"github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
)
type DataHub struct {
Client *RestClient
// for batch client
cType CompressorType
schemaClient *schemaRegistryClient
}
// ListProjects list all projects
func (datahub *DataHub) ListProject() (*ListProjectResult, error) {
path := projectsPath
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
responseBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewListProjectResult(responseBody, commonResp)
}
// ListProjects list projects with filter
func (datahub *DataHub) ListProjectWithFilter(filter string) (*ListProjectResult, error) {
path := projectsPath
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
Query: map[string]string{httpFilterQuery: filter},
}
responseBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewListProjectResult(responseBody, commonResp)
}
// CreateProject create new project
func (datahub *DataHub) CreateProject(projectName, comment string) (*CreateProjectResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckComment(comment) {
return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
}
path := fmt.Sprintf(projectPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
requestBody := &CreateProjectRequest{
Comment: comment,
}
_, commonResp, err := datahub.Client.Post(path, requestBody, reqPara)
if err != nil {
return nil, err
}
return NewCreateProjectResult(commonResp)
}
// UpdateProject update project
func (datahub *DataHub) UpdateProject(projectName, comment string) (*UpdateProjectResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckComment(comment) {
return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
}
path := fmt.Sprintf(projectPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
requestBody := &UpdateProjectRequest{
Comment: comment,
}
_, commonResp, err := datahub.Client.Put(path, requestBody, reqPara)
if err != nil {
return nil, err
}
return NewUpdateProjectResult(commonResp)
}
// DeleteProject delete project
func (datahub *DataHub) DeleteProject(projectName string) (*DeleteProjectResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
path := fmt.Sprintf(projectPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
_, commonResp, err := datahub.Client.Delete(path, reqPara)
if err != nil {
return nil, err
}
return NewDeleteProjectResult(commonResp)
}
// GetProject get a project deatil named the given name
func (datahub *DataHub) GetProject(projectName string) (*GetProjectResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
path := fmt.Sprintf(projectPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
result, err := NewGetProjectResult(respBody, commonResp)
if err != nil {
return nil, err
}
result.ProjectName = projectName
return result, nil
}
// Update project vpc white list.
func (datahub *DataHub) UpdateProjectVpcWhitelist(projectName, vpcIds string) (*UpdateProjectVpcWhitelistResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
path := fmt.Sprintf(projectPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
requestBody := &UpdateProjectVpcWhitelistRequest{
VpcIds: vpcIds,
}
_, commonResp, err := datahub.Client.Put(path, requestBody, reqPara)
if err != nil {
return nil, err
}
return NewUpdateProjectVpcWhitelistResult(commonResp)
}
func (datahub *DataHub) WaitAllShardsReady(projectName, topicName string) bool {
return datahub.WaitAllShardsReadyWithTime(projectName, topicName, minWaitingTimeInMs/1000)
}
func (datahub *DataHub) WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool {
ready := make(chan bool)
if timeout > 0 {
go func(timeout int64) {
time.Sleep(time.Duration(timeout) * time.Second)
ready <- false
}(timeout)
}
go func(datahub DataHubApi) {
for {
ls, err := datahub.ListShard(projectName, topicName)
if err != nil {
time.Sleep(1 * time.Microsecond)
continue
}
ok := true
for _, shard := range ls.Shards {
switch shard.State {
case ACTIVE, CLOSED:
continue
default:
ok = false
break
}
}
if ok {
break
}
}
ready <- true
}(datahub)
return <-ready
}
func (datahub *DataHub) ListTopic(projectName string) (*ListTopicResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
path := fmt.Sprintf(topicsPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewListTopicResult(respBody, commonResp)
}
func (datahub *DataHub) ListTopicWithFilter(projectName, filter string) (*ListTopicResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
path := fmt.Sprintf(topicsPath, projectName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
Query: map[string]string{httpFilterQuery: filter},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewListTopicResult(respBody, commonResp)
}
func (datahub *DataHub) CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) (*CreateBlobTopicResult, error) {
para := &CreateTopicParameter{
ShardCount: shardCount,
LifeCycle: lifeCycle,
Comment: comment,
RecordType: BLOB,
RecordSchema: nil,
ExpandMode: SPLIT_EXTEND,
}
ret, err := datahub.CreateTopicWithPara(projectName, topicName, para)
if err != nil {
return nil, err
}
return NewCreateBlobTopicResult(&ret.CommonResponseResult)
}
func (datahub *DataHub) CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) (*CreateTupleTopicResult, error) {
para := &CreateTopicParameter{
ShardCount: shardCount,
LifeCycle: lifeCycle,
Comment: comment,
RecordType: TUPLE,
RecordSchema: recordSchema,
ExpandMode: SPLIT_EXTEND,
}
ret, err := datahub.CreateTopicWithPara(projectName, topicName, para)
if err != nil {
return nil, err
}
return NewCreateTupleTopicResult(&ret.CommonResponseResult)
}
func (datahub *DataHub) CreateTopicWithPara(projectName, topicName string, para *CreateTopicParameter) (*CreateTopicWithParaResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if para == nil {
return nil, NewInvalidParameterErrorWithMessage(parameterNull)
}
if !util.CheckComment(para.Comment) {
return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
}
if para.RecordType != TUPLE && para.RecordType != BLOB {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("Invalid RecordType: %s", para.RecordType))
}
if para.RecordType == TUPLE && para.RecordSchema == nil {
return nil, NewInvalidParameterErrorWithMessage("Tuple topic must set RecordSchema")
}
if para.LifeCycle <= 0 {
return nil, NewInvalidParameterErrorWithMessage(lifecycleInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ctr := &CreateTopicRequest{
Action: "create",
ShardCount: para.ShardCount,
Lifecycle: para.LifeCycle,
RecordType: para.RecordType,
RecordSchema: para.RecordSchema,
Comment: para.Comment,
ExpandMode: para.ExpandMode,
}
_, commonResp, err := datahub.Client.Post(path, ctr, reqPara)
if err != nil {
return nil, err
}
return NewCreateTopicWithParaResult(commonResp)
}
func (datahub *DataHub) UpdateTopic(projectName, topicName, comment string) (*UpdateTopicResult, error) {
para := &UpdateTopicParameter{
Comment: comment,
}
return datahub.UpdateTopicWithPara(projectName, topicName, para)
}
// Update topic meta information. Only support comment and lifeCycle now.
func (datahub *DataHub) UpdateTopicWithPara(projectName, topicName string, para *UpdateTopicParameter) (*UpdateTopicResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if para == nil {
return nil, NewInvalidParameterErrorWithMessage(parameterNull)
}
if !util.CheckComment(para.Comment) {
return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ut := &UpdateTopicRequest{
Lifecycle: para.LifeCycle,
Comment: para.Comment,
}
_, commonResp, err := datahub.Client.Put(path, ut, reqPara)
if err != nil {
return nil, err
}
return NewUpdateTopicResult(commonResp)
}
func (datahub *DataHub) DeleteTopic(projectName, topicName string) (*DeleteTopicResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
_, commonResp, err := datahub.Client.Delete(path, reqPara)
if err != nil {
return nil, err
}
return NewDeleteTopicResult(commonResp)
}
func (datahub *DataHub) GetTopic(projectName, topicName string) (*GetTopicResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
result, err := NewGetTopicResult(respBody, commonResp)
if err != nil {
return nil, err
}
result.ProjectName = projectName
result.TopicName = topicName
return result, nil
}
func (datahub *DataHub) ListShard(projectName, topicName string) (*ListShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewListShardResult(respBody, commonResp)
}
func (datahub *DataHub) SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
splitKey, err := generateSpliteKey(projectName, topicName, shardId, datahub)
if err != nil {
return nil, err
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ssr := &SplitShardRequest{
Action: "split",
ShardId: shardId,
SplitKey: splitKey,
}
respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara)
if err != nil {
return nil, err
}
return NewSplitShardResult(respBody, commonResp)
}
func (datahub *DataHub) SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ssr := &SplitShardRequest{
Action: "split",
ShardId: shardId,
SplitKey: splitKey,
}
respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara)
if err != nil {
return nil, err
}
return NewSplitShardResult(respBody, commonResp)
}
func (datahub *DataHub) MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) || !util.CheckShardId(adjacentShardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
mss := &MergeShardRequest{
Action: "merge",
ShardId: shardId,
AdjacentShardId: adjacentShardId,
}
respBody, commonResp, err := datahub.Client.Post(path, mss, reqPara)
if err != nil {
return nil, err
}
return NewMergeShardResult(respBody, commonResp)
}
func (datahub *DataHub) ExtendShard(projectName, topicName string, shardCount int) (*ExtendShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if shardCount <= 0 {
return nil, NewInvalidParameterErrorWithMessage("shardCount is invalid")
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
mss := &ExtendShardRequest{
Action: "extend",
ExtendMode: "TO",
ShardCount: shardCount,
}
_, commonResp, err := datahub.Client.Post(path, mss, reqPara)
if err != nil {
return nil, err
}
return NewExtendShardResult(commonResp)
}
func (datahub *DataHub) GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
if len(param) > 1 {
return nil, NewInvalidParameterErrorWithMessage(parameterNumInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
gcr := &GetCursorRequest{
Action: "cursor",
CursorType: ctype,
}
switch ctype {
case OLDEST, LATEST:
if len(param) != 0 {
return nil, NewInvalidParameterErrorWithMessage("Not need extra parameter when CursorType OLDEST or LATEST")
}
case SYSTEM_TIME:
if len(param) != 1 {
return nil, NewInvalidParameterErrorWithMessage("Timestamp must be set when CursorType is SYSTEM_TIME")
}
gcr.SystemTime = param[0]
case SEQUENCE:
if len(param) != 1 {
return nil, NewInvalidParameterErrorWithMessage("Sequence must be set when CursorType is SEQUENCE")
}
gcr.Sequence = param[0]
}
respBody, commonResp, err := datahub.Client.Post(path, gcr, reqPara)
if err != nil {
return nil, err
}
return NewGetCursorResult(respBody, commonResp)
}
func (datahub *DataHub) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if len(records) == 0 {
return nil, NewInvalidParameterErrorWithMessage(recordsInvalid)
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
prr := &PutRecordsRequest{
Action: "pub",
Records: records,
}
respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara)
if err != nil {
return nil, err
}
return NewPutRecordsResult(respBody, commonResp)
}
func (datahub *DataHub) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
return nil, fmt.Errorf("not support this method")
}
func (datahub *DataHub) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
if recordSchema == nil {
return nil, NewInvalidParameterErrorWithMessage(missingRecordSchema)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
grr := &GetRecordRequest{
Action: "sub",
Cursor: cursor,
Limit: limit,
}
respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
if err != nil {
return nil, err
}
ret, err := NewGetRecordsResult(respBody, recordSchema, commonResp)
if err != nil {
return nil, err
}
for _, record := range ret.Records {
if _, ok := record.(*TupleRecord); !ok {
return nil, NewInvalidParameterErrorWithMessage("shouldn't call this method for BLOB topic")
}
}
return ret, nil
}
func (datahub *DataHub) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
grr := &GetRecordRequest{
Action: "sub",
Cursor: cursor,
Limit: limit,
}
respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
if err != nil {
return nil, err
}
return NewGetRecordsResult(respBody, nil, commonResp)
}
func (datahub *DataHub) AppendField(projectName, topicName string, field Field) (*AppendFieldResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
afr := &AppendFieldRequest{
Action: "AppendField",
FieldName: field.Name,
FieldType: field.Type,
}
_, commonResp, err := datahub.Client.Post(path, afr, reqPara)
if err != nil {
return nil, err
}
return NewAppendFieldResult(commonResp)
}
func (datahub *DataHub) GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
gmir := &GetMeterInfoRequest{
Action: "meter",
}
respBody, commonResp, err := datahub.Client.Post(path, gmir, reqPara)
if err != nil {
return nil, err
}
return NewGetMeterInfoResult(respBody, commonResp)
}
func (datahub *DataHub) ListConnector(projectName, topicName string) (*ListConnectorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
Query: map[string]string{httpHeaderConnectorMode: "id"},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewListConnectorResult(respBody, commonResp)
}
func (datahub *DataHub) CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error) {
return datahub.CreateConnectorWithStartTime(projectName, topicName, cType, columnFields, -1, config)
}
func (datahub *DataHub) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error) {
para := &CreateConnectorParameter{
SinkStartTime: sinkStartTime,
ConnectorType: cType,
ColumnFields: columnFields,
ColumnNameMap: nil,
Config: config,
}
return datahub.CreateConnectorWithPara(projectName, topicName, para)
}
func (datahub *DataHub) CreateConnectorWithPara(projectName, topicName string, para *CreateConnectorParameter) (*CreateConnectorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if para == nil {
return nil, NewInvalidParameterErrorWithMessage(parameterNull)
}
if !validateConnectorType(para.ConnectorType) {
return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, para.ConnectorType.String())
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ccr := &CreateConnectorRequest{
Action: "create",
Type: para.ConnectorType,
SinkStartTime: para.SinkStartTime,
ColumnFields: para.ColumnFields,
ColumnNameMap: para.ColumnNameMap,
Config: para.Config,
}
respBody, commonResp, err := datahub.Client.Post(path, ccr, reqPara)
if err != nil {
return nil, err
}
return NewCreateConnectorResult(respBody, commonResp)
}
func (datahub *DataHub) GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewGetConnectorResult(respBody, commonResp)
}
func (datahub *DataHub) UpdateConnector(projectName, topicName, connectorId string, config interface{}) (*UpdateConnectorResult, error) {
para := &UpdateConnectorParameter{
ColumnFields: nil,
ColumnNameMap: nil,
Config: config,
}
return datahub.UpdateConnectorWithPara(projectName, topicName, connectorId, para)
}
func (datahub *DataHub) UpdateConnectorWithPara(projectName, topicName, connectorId string, para *UpdateConnectorParameter) (*UpdateConnectorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if para == nil {
return nil, NewInvalidParameterErrorWithMessage(parameterNull)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ucr := &UpdateConnectorRequest{
Action: "updateconfig",
ColumnFields: para.ColumnFields,
ColumnNameMap: para.ColumnNameMap,
Config: para.Config,
}
_, commonResp, err := datahub.Client.Post(path, ucr, reqPara)
if err != nil {
return nil, err
}
return NewUpdateConnectorResult(commonResp)
}
func (datahub *DataHub) DeleteConnector(projectName, topicName, connectorId string) (*DeleteConnectorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
_, commonResp, err := datahub.Client.Delete(path, reqPara)
if err != nil {
return nil, err
}
return NewDeleteConnectorResult(commonResp)
}
func (datahub *DataHub) GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
Query: map[string]string{"donetime": ""},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewGetConnectorDoneTimeResult(respBody, commonResp)
}
func (datahub *DataHub) GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
gcss := &GetConnectorShardStatusRequest{
Action: "Status",
}
respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara)
if err != nil {
return nil, err
}
return NewGetConnectorShardStatusResult(respBody, commonResp)
}
func (datahub *DataHub) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*GetConnectorShardStatusByShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
gcss := &GetConnectorShardStatusRequest{
Action: "Status",
ShardId: shardId,
}
respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara)
if err != nil {
return nil, err
}
return NewGetConnectorShardStatusByShardResult(respBody, commonResp)
}
func (datahub *DataHub) ReloadConnector(projectName, topicName, connectorId string) (*ReloadConnectorResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
rcr := &ReloadConnectorRequest{
Action: "Reload",
}
_, commonResp, err := datahub.Client.Post(path, rcr, reqPara)
if err != nil {
return nil, err
}
return NewReloadConnectorResult(commonResp)
}
func (datahub *DataHub) ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) (*ReloadConnectorByShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
rcr := &ReloadConnectorRequest{
Action: "Reload",
ShardId: shardId,
}
_, commonResp, err := datahub.Client.Post(path, rcr, reqPara)
if err != nil {
return nil, err
}
return NewReloadConnectorByShardResult(commonResp)
}
func (datahub *DataHub) UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) (*UpdateConnectorStateResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !validateConnectorState(state) {
return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ucsr := &UpdateConnectorStateRequest{
Action: "updatestate",
State: state,
}
_, commonResp, err := datahub.Client.Post(path, ucsr, reqPara)
if err != nil {
return nil, err
}
return NewUpdateConnectorStateResult(commonResp)
}
func (datahub *DataHub) UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) (*UpdateConnectorOffsetResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ucor := &UpdateConnectorOffsetRequest{
Action: "updateshardcontext",
ShardId: shardId,
Timestamp: offset.Timestamp,
Sequence: offset.Sequence,
}
_, commonResp, err := datahub.Client.Post(path, ucor, reqPara)
if err != nil {
return nil, err
}
return NewUpdateConnectorOffsetResult(commonResp)
}
func (datahub *DataHub) AppendConnectorField(projectName, topicName, connectorId, fieldName string) (*AppendConnectorFieldResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
acfr := &AppendConnectorFieldRequest{
Action: "appendfield",
FieldName: fieldName,
}
_, commonResp, err := datahub.Client.Post(path, acfr, reqPara)
if err != nil {
return nil, err
}
return NewAppendConnectorFieldResult(commonResp)
}
func (datahub *DataHub) ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(subscriptionsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lsr := &ListSubscriptionRequest{
Action: "list",
PageIndex: pageIndex,
PageSize: pageSize,
}
respBody, commonResp, err := datahub.Client.Post(path, lsr, reqPara)
if err != nil {
return nil, err
}
return NewListSubscriptionResult(respBody, commonResp)
}
func (datahub *DataHub) CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckComment(comment) {
return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
}
path := fmt.Sprintf(subscriptionsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
csr := &CreateSubscriptionRequest{
Action: "create",
Comment: comment,
}
respBody, commonResp, err := datahub.Client.Post(path, csr, reqPara)
if err != nil {
return nil, err
}
return NewCreateSubscriptionResult(respBody, commonResp)
}
func (datahub *DataHub) UpdateSubscription(projectName, topicName, subId, comment string) (*UpdateSubscriptionResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckComment(comment) {
return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
}
path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
usr := &UpdateSubscriptionRequest{
Comment: comment,
}
_, commonResp, err := datahub.Client.Put(path, usr, reqPara)
if err != nil {
return nil, err
}
return NewUpdateSubscriptionResult(commonResp)
}
func (datahub *DataHub) DeleteSubscription(projectName, topicName, subId string) (*DeleteSubscriptionResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
_, commonResp, err := datahub.Client.Delete(path, reqPara)
if err != nil {
return nil, err
}
return NewDeleteSubscriptionResult(commonResp)
}
func (datahub *DataHub) GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
respBody, commonResp, err := datahub.Client.Get(path, reqPara)
if err != nil {
return nil, err
}
return NewGetSubscriptionResult(respBody, commonResp)
}
func (datahub *DataHub) UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) (*UpdateSubscriptionStateResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
usr := &UpdateSubscriptionStateRequest{
State: state,
}
_, commonResp, err := datahub.Client.Put(path, usr, reqPara)
if err != nil {
return nil, err
}
return NewUpdateSubscriptionStateResult(commonResp)
}
func (datahub *DataHub) OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
for _, id := range shardIds {
if !util.CheckShardId(id) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
}
path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
ossr := &OpenSubscriptionSessionRequest{
Action: "open",
ShardIds: shardIds,
}
respBody, commonResp, err := datahub.Client.Post(path, ossr, reqPara)
if err != nil {
return nil, err
}
return NewOpenSubscriptionSessionResult(respBody, commonResp)
}
func (datahub *DataHub) GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
for _, id := range shardIds {
if !util.CheckShardId(id) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
}
path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
gsor := &GetSubscriptionOffsetRequest{
Action: "get",
ShardIds: shardIds,
}
respBody, commonResp, err := datahub.Client.Post(path, gsor, reqPara)
if err != nil {
return nil, err
}
return NewGetSubscriptionOffsetResult(respBody, commonResp)
}
func (datahub *DataHub) CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*CommitSubscriptionOffsetResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
req := &CommitSubscriptionOffsetRequest{
Action: "commit",
Offsets: offsets,
}
_, commonResp, err := datahub.Client.Put(path, req, reqPara)
if err != nil {
return nil, err
}
return NewCommitSubscriptionOffsetResult(commonResp)
}
func (datahub *DataHub) ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*ResetSubscriptionOffsetResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
req := &ResetSubscriptionOffsetRequest{
Action: "reset",
Offsets: offsets,
}
_, commonResp, err := datahub.Client.Put(path, req, reqPara)
if err != nil {
return nil, err
}
return NewResetSubscriptionOffsetResult(commonResp)
}
func (datahub *DataHub) Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
for _, id := range holdShardList {
if !util.CheckShardId(id) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
}
for _, id := range readEndShardList {
if !util.CheckShardId(id) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
}
path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
hr := &HeartbeatRequest{
Action: "heartbeat",
ConsumerId: consumerId,
VersionId: versionId,
HoldShardList: holdShardList,
ReadEndShardList: readEndShardList,
}
respBody, commonResp, err := datahub.Client.Post(path, hr, reqPara)
if err != nil {
return nil, err
}
return NewHeartbeatResult(respBody, commonResp)
}
func (datahub *DataHub) JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
jgr := &JoinGroupRequest{
Action: "joinGroup",
SessionTimeout: sessionTimeout,
}
respBody, commonResp, err := datahub.Client.Post(path, jgr, reqPara)
if err != nil {
return nil, err
}
return NewJoinGroupResult(respBody, commonResp)
}
func (datahub *DataHub) SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) (*SyncGroupResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if len(releaseShardList) == 0 || len(readEndShardList) == 0 {
return nil, NewInvalidParameterErrorWithMessage(shardListInvalid)
}
for _, id := range releaseShardList {
if !util.CheckShardId(id) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
}
for _, id := range readEndShardList {
if !util.CheckShardId(id) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
}
path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
sgr := &SyncGroupRequest{
Action: "syncGroup",
ConsumerId: consumerId,
VersionId: versionId,
ReleaseShardList: releaseShardList,
ReadEndShardList: readEndShardList,
}
_, commonResp, err := datahub.Client.Post(path, sgr, reqPara)
if err != nil {
return nil, err
}
return NewSyncGroupResult(commonResp)
}
func (datahub *DataHub) LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) (*LeaveGroupResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lgr := &LeaveGroupRequest{
Action: "leaveGroup",
ConsumerId: consumerId,
VersionId: versionId,
}
_, commonResp, err := datahub.Client.Post(path, lgr, reqPara)
if err != nil {
return nil, err
}
return NewLeaveGroupResult(commonResp)
}
func (datahub *DataHub) ListTopicSchema(projectName, topicName string) (*ListTopicSchemaResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lts := &ListTopicSchemaRequest{
Action: "ListSchema",
}
respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
if err != nil {
return nil, err
}
return NewListTopicSchemaResult(respBody, commonResp)
}
func (datahub *DataHub) GetTopicSchemaByVersion(projectName, topicName string, versionId int) (*GetTopicSchemaResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lts := &GetTopicSchemaRequest{
Action: "GetSchema",
VersionId: versionId,
RecordSchema: nil,
}
respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
if err != nil {
return nil, err
}
return NewGetTopicSchemaResult(respBody, commonResp)
}
func (datahub *DataHub) GetTopicSchemaBySchema(projectName, topicName string, recordSchema *RecordSchema) (*GetTopicSchemaResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lts := &GetTopicSchemaRequest{
Action: "GetSchema",
VersionId: -1,
RecordSchema: recordSchema,
}
respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
if err != nil {
return nil, err
}
return NewGetTopicSchemaResult(respBody, commonResp)
}
func (datahub *DataHub) RegisterTopicSchema(projectName, topicName string, recordSchema *RecordSchema) (*RegisterTopicSchemaResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lts := &RegisterTopicSchemaRequest{
Action: "RegisterSchema",
RecordSchema: recordSchema,
}
respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
if err != nil {
return nil, err
}
return NewRegisterTopicSchemaResult(respBody, commonResp)
}
func (datahub *DataHub) DeleteTopicSchema(projectName, topicName string, versionId int) (*DeleteTopicSchemaResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(topicPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{httpHeaderContentType: httpJsonContent},
}
lts := &DeleteTopicSchemaRequest{
Action: "DeleteSchema",
VersionId: versionId,
}
_, commonResp, err := datahub.Client.Post(path, lts, reqPara)
if err != nil {
return nil, err
}
return NewDeleteTopicSchemaResult(commonResp)
}
func (datahub *DataHub) getSchemaRegistry() *schemaRegistryClient {
return datahub.schemaClient
}
type DataHubPB struct {
DataHub
}
func (datahub *DataHubPB) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
path := fmt.Sprintf(shardsPath, projectName, topicName)
reqPara := &RequestParameter{
Header: map[string]string{
httpHeaderContentType: httpProtoContent,
httpHeaderRequestAction: httpPublistContent},
}
prr := &PutPBRecordsRequest{
Records: records,
}
respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara)
if err != nil {
return nil, err
}
return NewPutPBRecordsResult(respBody, commonResp)
}
func (datahub *DataHubPB) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{
httpHeaderContentType: httpProtoContent,
httpHeaderRequestAction: httpPublistContent},
}
prr := &PutPBRecordsRequest{
Records: records,
}
_, commonResp, err := datahub.Client.Post(path, prr, reqPara)
if err != nil {
return nil, err
}
return NewPutRecordsByShardResult(commonResp)
}
func (datahub *DataHubPB) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{
httpHeaderContentType: httpProtoContent,
httpHeaderRequestAction: httpSubscribeContent},
}
grr := &GetPBRecordRequest{
Cursor: cursor,
Limit: limit,
}
respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
if err != nil {
return nil, err
}
return NewGetPBRecordsResult(respBody, recordSchema, commonResp)
}
func (datahub *DataHubPB) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{
httpHeaderContentType: httpProtoContent,
httpHeaderRequestAction: httpSubscribeContent},
}
grr := &GetPBRecordRequest{
Cursor: cursor,
Limit: limit,
}
respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
if err != nil {
return nil, err
}
return NewGetPBRecordsResult(respBody, nil, commonResp)
}
type DataHubBatch struct {
DataHub
}
func (datahub *DataHubBatch) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
return nil, fmt.Errorf("not support this method")
}
func (datahub *DataHubBatch) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{
httpHeaderContentType: httpProtoBatchContent,
httpHeaderRequestAction: httpPublistContent},
}
serializer := newBatchSerializer(projectName, topicName, datahub.cType, datahub.schemaClient)
prr := &PutBatchRecordsRequest{
serializer: serializer,
Records: records,
}
_, commonResp, err := datahub.Client.Post(path, prr, reqPara)
if err != nil {
return nil, err
}
return NewPutRecordsByShardResult(commonResp)
}
func (datahub *DataHubBatch) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
if !util.CheckProjectName(projectName) {
return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
}
if !util.CheckTopicName(topicName) {
return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
}
if !util.CheckShardId(shardId) {
return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
}
path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
reqPara := &RequestParameter{
Header: map[string]string{
httpHeaderContentType: httpProtoBatchContent,
httpHeaderRequestAction: httpSubscribeContent},
}
gbr := &GetBatchRecordRequest{
GetPBRecordRequest{
Cursor: cursor,
Limit: limit,
},
}
respBody, commonResp, err := datahub.Client.Post(path, gbr, reqPara)
if err != nil {
return nil, err
}
deserializer := newBatchDeserializer(projectName, topicName, shardId, recordSchema, datahub.schemaClient)
return NewGetBatchRecordsResult(respBody, recordSchema, commonResp, deserializer)
}
func (datahub *DataHubBatch) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
return datahub.GetTupleRecords(projectName, topicName, shardId, cursor, limit, nil)
}