datahub/connector.go (866 lines of code) (raw):
package datahub
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"strconv"
)
type AuthMode string
const (
AK AuthMode = "ak"
STS AuthMode = "sts"
)
type ConnectorType string
const (
SinkOdps ConnectorType = "sink_odps"
SinkOss ConnectorType = "sink_oss"
SinkEs ConnectorType = "sink_es"
SinkAds ConnectorType = "sink_ads"
SinkMysql ConnectorType = "sink_mysql"
SinkFc ConnectorType = "sink_fc"
SinkOts ConnectorType = "sink_ots"
SinkDatahub ConnectorType = "sink_datahub"
SinkHologres ConnectorType = "sink_hologres"
)
func (ct *ConnectorType) String() string {
return string(*ct)
}
func validateConnectorType(ct ConnectorType) bool {
switch ct {
case SinkOdps, SinkOss, SinkEs, SinkAds, SinkMysql, SinkFc, SinkOts, SinkDatahub, SinkHologres:
return true
default:
return false
}
}
type ConnectorState string
const (
ConnectorStopped ConnectorState = "CONNECTOR_STOPPED"
ConnectorRunning ConnectorState = "CONNECTOR_RUNNING"
)
func validateConnectorState(ct ConnectorState) bool {
switch ct {
case ConnectorStopped, ConnectorRunning:
return true
default:
return false
}
}
type ConnectorTimestampUnit string
const (
ConnectorMicrosecond ConnectorTimestampUnit = "MICROSECOND"
ConnectorMillisecond ConnectorTimestampUnit = "MILLISECOND"
ConnectorSecond ConnectorTimestampUnit = "SECOND"
)
type ConnectorConfig struct {
TimestampUnit ConnectorTimestampUnit `json:"TimestampUnit"`
}
type PartitionMode string
const (
UserDefineMode PartitionMode = "USER_DEFINE"
SystemTimeMode PartitionMode = "SYSTEM_TIME"
EventTimeMode PartitionMode = "EVENT_TIME"
)
func (pm *PartitionMode) String() string {
return string(*pm)
}
func NewPartitionConfig() *PartitionConfig {
pc := &PartitionConfig{
ConfigMap: make([]map[string]string, 0),
}
return pc
}
type PartitionConfig struct {
ConfigMap []map[string]string
}
func (pc *PartitionConfig) AddConfig(key, value string) {
m := map[string]string{
key: value,
}
pc.ConfigMap = append(pc.ConfigMap, m)
}
func (pc *PartitionConfig) MarshalJSON() ([]byte, error) {
if pc == nil || len(pc.ConfigMap) == 0 {
return nil, nil
}
buf := &bytes.Buffer{}
buf.Write([]byte{'{'})
length := len(pc.ConfigMap)
for i, m := range pc.ConfigMap {
for k, v := range m {
if _, err := fmt.Fprintf(buf, "\"%s\":\"%s\"", k, v); err != nil {
return nil, fmt.Errorf("partition config is invalid")
}
}
if i < length-1 {
buf.WriteByte(',')
}
}
buf.WriteByte('}')
return buf.Bytes(), nil
}
func (pc *PartitionConfig) UnmarshalJSON(data []byte) error {
//the data is "xxxxxx",should convert to xxxx, remove the ""
var str *string = new(string)
if err := json.Unmarshal(data, str); err != nil {
return err
}
confParser := make([]map[string]string, 0)
if err := json.Unmarshal([]byte(*str), &confParser); err != nil {
return err
}
confMap := make([]map[string]string, len(confParser))
//convert {"key":"ds","value":"%Y%m%d",...} to {"ds":"%Y%m%d",...}
for i, m := range confParser {
confMap[i] = map[string]string{
m["key"]: m["value"],
}
}
pc.ConfigMap = confMap
return nil
}
/** ODPS CONFIG **/
type SinkOdpsConfig struct {
ConnectorConfig
Endpoint string `json:"OdpsEndpoint"`
Project string `json:"Project"`
Table string `json:"Table"`
AccessId string `json:"AccessId"`
AccessKey string `json:"AccessKey"`
TimeRange int `json:"TimeRange"`
TimeZone string `json:"TimeZone,omitempty"`
PartitionMode PartitionMode `json:"PartitionMode"`
PartitionConfig PartitionConfig `json:"PartitionConfig"`
TunnelEndpoint string `json:"TunnelEndpoint,omitempty"`
SplitKey string `json:"SplitKey,omitempty"`
Base64Encode bool `json:"Base64Encode,omitempty"`
}
func marshalCreateOdpsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkOdpsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOdpsConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
Type string `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkOdpsConfig `json:"Config"`
}{
Action: ccr.Action,
Type: ccr.Type.String(),
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetOdpsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
//the api return TimeRange is string, so need to convert to int64
type SinkOdpsConfigHelper struct {
SinkOdpsConfig
TimeRange string `json:"TimeRange"`
}
ct := &struct {
GetConnectorResult
Config SinkOdpsConfigHelper `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
conf := ct.Config.SinkOdpsConfig
t, err := strconv.Atoi(ct.Config.TimeRange)
if err != nil {
return nil, err
}
conf.TimeRange = t
ret := &ct.GetConnectorResult
ret.Config = conf
ret.CommonResponseResult = *commonResp
return ret, nil
}
// no config update
func marshalUpdateConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
}{
Action: ucr.Action,
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
}
return json.Marshal(ct)
}
func marshalUpdateOdpsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkOdpsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOdpsConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkOdpsConfig `json:"Config,omitempty"`
}{
Action: ucr.Action,
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
/* Oss Config */
type SinkOssConfig struct {
ConnectorConfig
Endpoint string `json:"Endpoint"`
Bucket string `json:"Bucket"`
Prefix string `json:"Prefix"`
TimeFormat string `json:"TimeFormat"`
TimeRange int `json:"TimeRange"`
AuthMode AuthMode `json:"AuthMode"`
AccessId string `json:"AccessId,omitempty"`
AccessKey string `json:"AccessKey,omitempty"`
MaxFileSize int64 `json:"MaxFileSize,omitempty"`
}
func marshalCreateOssConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkOssConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOssConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkOssConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetOssConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
type SinkOssConfigHelper struct {
SinkOssConfig
TimeRange string `json:"TimeRange"`
}
ct := &struct {
GetConnectorResult
Config SinkOssConfigHelper `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
soConf := ct.Config.SinkOssConfig
t, err := strconv.Atoi(ct.Config.TimeRange)
if err != nil {
return nil, err
}
soConf.TimeRange = t
ret := &ct.GetConnectorResult
ret.Config = soConf
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateOssConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkOssConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOssConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkOssConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
/* mysql Config */
type SinkMysqlConfig struct {
ConnectorConfig
Host string `json:"Host"`
Port string `json:"Port"`
Database string `json:"Database"`
Table string `json:"Table"`
User string `json:"User"`
Password string `json:"Password"`
Ignore InsertMode `json:"Ignore"`
}
type InsertMode string
const (
IGNORE InsertMode = "true"
OVERWRITE InsertMode = "false"
)
func marshalCreateMysqlConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkMysqlConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkMysqlConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetMysqlConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
ct := &struct {
GetConnectorResult
Config SinkMysqlConfig `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
ret := &ct.GetConnectorResult
ret.Config = ct.Config
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateMysqlConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkMysqlConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkMysqlConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
/* Ads Config */
type SinkAdsConfig struct {
SinkMysqlConfig
}
func marshalCreateAdsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkAdsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkAdsConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkAdsConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetAdsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
ct := &struct {
GetConnectorResult
Config SinkMysqlConfig `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
ret := &ct.GetConnectorResult
ret.Config = ct.Config
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateAdsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkAdsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkAdsConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkAdsConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
/* datahub Config */
type SinkDatahubConfig struct {
ConnectorConfig
Endpoint string `json:"Endpoint"`
Project string `json:"Project"`
Topic string `json:"Topic"`
AuthMode AuthMode `json:"AuthMode"`
AccessId string `json:"AccessId,omitempty"`
AccessKey string `json:"AccessKey,omitempty"`
}
func marshalCreateDatahubConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkDatahubConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkDatahubConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkDatahubConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetDatahubConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
ct := &struct {
GetConnectorResult
Config SinkDatahubConfig `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
ret := &ct.GetConnectorResult
ret.Config = ct.Config
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateDatahubConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkDatahubConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkDatahubConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkDatahubConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
/* ES Config */
type SinkEsConfig struct {
ConnectorConfig
Index string `json:"Index"`
Endpoint string `json:"Endpoint"`
User string `json:"User"`
Password string `json:"Password"`
IDFields []string `json:"IDFields"`
TypeFields []string `json:"TypeFields"`
RouterFields []string `json:"RouterFields"`
ProxyMode bool `json:"ProxyMode"`
}
func marshalCreateEsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkEsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkEsConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
// server need ProxyMode be string
type SinkEsConfigHelper struct {
SinkEsConfig
ProxyMode string `json:"ProxyMode"`
}
confHelper := SinkEsConfigHelper{
SinkEsConfig: soConf,
ProxyMode: strconv.FormatBool(soConf.ProxyMode),
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkEsConfigHelper `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: confHelper,
}
return json.Marshal(ct)
}
func unmarshalGetEsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
type SinkEsConfigHelper struct {
SinkEsConfig
IDFields string `json:"IDFields"`
TypeFields string `json:"TypeFields"`
RouterFields string `json:"RouterFields"`
ProxyMode string `json:"ProxyMode"`
}
ct := &struct {
GetConnectorResult
Config SinkEsConfigHelper `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
conf := ct.Config.SinkEsConfig
if ct.Config.ProxyMode != "" {
proxy, err := strconv.ParseBool(ct.Config.ProxyMode)
if err != nil {
return nil, err
}
conf.ProxyMode = proxy
}
idFields := make([]string, 0)
if ct.Config.IDFields != "" {
if err := json.Unmarshal([]byte(ct.Config.IDFields), &idFields); err != nil {
return nil, err
}
}
conf.IDFields = idFields
typeFields := make([]string, 0)
if ct.Config.TypeFields != "" {
if err := json.Unmarshal([]byte(ct.Config.TypeFields), &typeFields); err != nil {
return nil, err
}
conf.TypeFields = typeFields
}
conf.TypeFields = typeFields
routerFields := make([]string, 0)
if ct.Config.RouterFields != "" {
if err := json.Unmarshal([]byte(ct.Config.RouterFields), &routerFields); err != nil {
return nil, err
}
}
conf.RouterFields = routerFields
ret := &ct.GetConnectorResult
ret.CommonResponseResult = *commonResp
ret.Config = conf
return ret, nil
}
func marshalUpdateEsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkEsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkEsConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkEsConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
type FcInvokeType string
const (
FcSync FcInvokeType = "sync"
FcAsync FcInvokeType = "async"
)
/* FC Config */
type SinkFcConfig struct {
ConnectorConfig
Endpoint string `json:"Endpoint"`
Service string `json:"Service"`
Function string `json:"Function"`
AuthMode AuthMode `json:"AuthMode"`
AccessId string `json:"AccessId,omitempty"`
AccessKey string `json:"AccessKey,omitempty"`
InvokeType FcInvokeType `json:"InvokeType"`
}
func marshalCreateFcConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkFcConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkFcConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
if soConf.InvokeType == "" {
soConf.InvokeType = FcSync
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkFcConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetFcConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
ct := &struct {
GetConnectorResult
Config SinkFcConfig `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
ret := &ct.GetConnectorResult
ret.Config = ct.Config
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateFcConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkFcConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkFcConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkFcConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
type OtsWriteMode string
const (
OtsPut OtsWriteMode = "PUT"
OtsUpdate OtsWriteMode = "UPDATE"
)
/* Ots Config */
type SinkOtsConfig struct {
ConnectorConfig
Endpoint string `json:"Endpoint"`
InstanceName string `json:"InstanceName"`
TableName string `json:"TableName"`
AuthMode AuthMode `json:"AuthMode"`
AccessId string `json:"AccessId,omitempty"`
AccessKey string `json:"AccessKey,omitempty"`
WriteMode OtsWriteMode `json:"WriteMode"`
}
func marshalCreateOtsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkOtsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOtsConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
if soConf.WriteMode == "" {
soConf.WriteMode = OtsPut
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkOtsConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetOtsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
ct := &struct {
GetConnectorResult
Config SinkOtsConfig `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
ret := &ct.GetConnectorResult
ret.Config = ct.Config
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateOtsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkOtsConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkOtsConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
/* datahub Config */
type SinkHologresConfig struct {
SinkDatahubConfig
InstanceId string `json:"InstanceId,omitempty"`
}
func marshalCreateHologresConnector(ccr *CreateConnectorRequest) ([]byte, error) {
soConf, ok := ccr.Config.(SinkHologresConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkHologresConfig", reflect.TypeOf(ccr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
Type ConnectorType `json:"Type"`
SinkStartTime int64 `json:"SinkStartTime"`
ColumnFields []string `json:"ColumnFields"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkHologresConfig `json:"Config"`
}{
Action: "create",
Type: ccr.Type,
SinkStartTime: ccr.SinkStartTime,
ColumnFields: ccr.ColumnFields,
ColumnNameMap: ccr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
func unmarshalGetHologresConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
ct := &struct {
GetConnectorResult
Config SinkHologresConfig `json:"Config"`
}{}
if err := json.Unmarshal(data, ct); err != nil {
return nil, err
}
ret := &ct.GetConnectorResult
ret.Config = ct.Config
ret.CommonResponseResult = *commonResp
return ret, nil
}
func marshalUpdateHologresConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
soConf, ok := ucr.Config.(SinkHologresConfig)
if !ok {
return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkHologresConfig", reflect.TypeOf(ucr.Config)))
}
// set default value
if soConf.TimestampUnit == "" {
soConf.TimestampUnit = ConnectorMicrosecond
}
ct := &struct {
Action string `json:"Action"`
ColumnFields []string `json:"ColumnFields,omitempty"`
ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
Config SinkHologresConfig `json:"Config,omitempty"`
}{
Action: "create",
ColumnFields: ucr.ColumnFields,
ColumnNameMap: ucr.ColumnNameMap,
Config: soConf,
}
return json.Marshal(ct)
}
type ConnectorOffset struct {
Timestamp int64 `json:"Timestamp"`
Sequence int64 `json:"Sequence"`
}
type ConnectorShardState string
// Deprecated, will be removed in a future version
const (
Created ConnectorShardState = "CONTEXT_PLANNED"
Eexcuting ConnectorShardState = "CONTEXT_EXECUTING"
Stopped ConnectorShardState = "CONTEXT_PAUSED"
Finished ConnectorShardState = "CONTEXT_FINISHED"
)
const (
ConnectorShardHang ConnectorShardState = "CONTEXT_HANG"
ConnectorShardPlanned ConnectorShardState = "CONTEXT_PLANNED"
ConnectorShardExecuting ConnectorShardState = "CONTEXT_EXECUTING"
ConnectorShardStopped ConnectorShardState = "CONTEXT_STOPPED"
ConnectorShardFinished ConnectorShardState = "CONTEXT_FINISHED"
)
type ConnectorShardStatusEntry struct {
StartSequence int64 `json:"StartSequence"`
EndSequence int64 `json:"EndSequence"`
CurrentSequence int64 `json:"CurrentSequence"`
CurrentTimestamp int64 `json:"CurrentTimestamp"`
UpdateTime int64 `json:"UpdateTime"`
State ConnectorShardState `json:"State"`
LastErrorMessage string `json:"LastErrorMessage"`
DiscardCount int64 `json:"DiscardCount"`
DoneTime int64 `json:"DoneTime"`
WorkerAddress string `json:"WorkerAddress"`
}