alicloud/service_alicloud_ots.go (875 lines of code) (raw):

package alicloud import ( "encoding/json" "errors" "regexp" "strconv" "strings" util "github.com/alibabacloud-go/tea-utils/service" "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/search" otsTunnel "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel" "time" "fmt" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ots" "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore" "github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity" "github.com/hashicorp/terraform-plugin-sdk/helper/resource" ) type OtsService struct { client *connectivity.AliyunClient } func (s *OtsService) getPrimaryKeyType(primaryKeyType string) tablestore.PrimaryKeyType { var keyType tablestore.PrimaryKeyType t := PrimaryKeyTypeString(primaryKeyType) switch t { case IntegerType: keyType = tablestore.PrimaryKeyType_INTEGER case StringType: keyType = tablestore.PrimaryKeyType_STRING case BinaryType: keyType = tablestore.PrimaryKeyType_BINARY } return keyType } func ParseDefinedColumnType(colType string) (tablestore.DefinedColumnType, error) { switch DefinedColumnTypeString(colType) { case DefinedColumnInteger: return tablestore.DefinedColumn_INTEGER, nil case DefinedColumnString: return tablestore.DefinedColumn_STRING, nil case DefinedColumnBinary: return tablestore.DefinedColumn_BINARY, nil case DefinedColumnDouble: return tablestore.DefinedColumn_DOUBLE, nil case DefinedColumnBoolean: return tablestore.DefinedColumn_BOOLEAN, nil } return 0, WrapError(fmt.Errorf("unsupported defined column type: %s", colType)) } func (s *OtsService) ListOtsTable(instanceName string) (table *tablestore.ListTableResponse, err error) { if _, err := s.DescribeOtsInstance(instanceName); err != nil { return nil, WrapError(err) } var raw interface{} var requestInfo *tablestore.TableStoreClient err = resource.Retry(5*time.Minute, func() *resource.RetryError { raw, err = s.client.WithTableStoreClient(instanceName, func(tableStoreClient *tablestore.TableStoreClient) (interface{}, error) { requestInfo = tableStoreClient return tableStoreClient.ListTable() }) if err != nil { if strings.HasSuffix(err.Error(), "no such host") { return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug("ListTable", raw, requestInfo) return nil }) if err != nil { if strings.HasPrefix(err.Error(), "OTSObjectNotExist") { return table, WrapErrorf(err, NotFoundMsg, AliyunTablestoreGoSdk) } return nil, WrapErrorf(err, DataDefaultErrorMsg, instanceName, "ListTable", AliyunTablestoreGoSdk) } table, _ = raw.(*tablestore.ListTableResponse) if table == nil { return table, WrapErrorf(NotFoundErr("OtsTable", instanceName), NotFoundMsg, ProviderERROR) } return } func (s *OtsService) DescribeOtsTable(id string) (*tablestore.DescribeTableResponse, error) { table := &tablestore.DescribeTableResponse{} parts, err := ParseResourceId(id, 2) if err != nil { return table, WrapError(err) } instanceName, tableName := parts[0], parts[1] request := new(tablestore.DescribeTableRequest) request.TableName = tableName if _, err := s.DescribeOtsInstance(instanceName); err != nil { return table, WrapError(err) } var raw interface{} var requestInfo *tablestore.TableStoreClient err = resource.Retry(5*time.Minute, func() *resource.RetryError { raw, err = s.client.WithTableStoreClient(instanceName, func(tableStoreClient *tablestore.TableStoreClient) (interface{}, error) { requestInfo = tableStoreClient return tableStoreClient.DescribeTable(request) }) if err != nil { if IsExpectedErrors(err, OtsTableIsTemporarilyUnavailable) { return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug("DescribeTable", raw, requestInfo, request) return nil }) if err != nil { if strings.HasPrefix(err.Error(), "OTSObjectNotExist") { return table, WrapErrorf(err, NotFoundMsg, AliyunTablestoreGoSdk) } return table, WrapErrorf(err, DefaultErrorMsg, id, "DescribeTable", AliyunTablestoreGoSdk) } table, _ = raw.(*tablestore.DescribeTableResponse) if table == nil || table.TableMeta == nil || table.TableMeta.TableName != tableName { return table, WrapErrorf(NotFoundErr("OtsTable", id), NotFoundMsg, ProviderERROR) } return table, nil } func (s *OtsService) WaitForOtsTable(instanceName, tableName string, status Status, timeout int) error { deadline := time.Now().Add(time.Duration(timeout) * time.Second) id := fmt.Sprintf("%s%s%s", instanceName, COLON_SEPARATED, tableName) for { object, err := s.DescribeOtsTable(id) if err != nil { if NotFoundError(err) { if status == Deleted { return nil } } else { return WrapError(err) } } if object.TableMeta.TableName == tableName && status != Deleted { return nil } if time.Now().After(deadline) { return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.TableMeta.TableName, tableName, ProviderERROR) } } } // Convert tablestore.PrimaryKeyType to PrimaryKeyTypeString func (s *OtsService) convertPrimaryKeyType(t tablestore.PrimaryKeyType) PrimaryKeyTypeString { var typeString PrimaryKeyTypeString switch t { case tablestore.PrimaryKeyType_INTEGER: typeString = IntegerType case tablestore.PrimaryKeyType_BINARY: typeString = BinaryType case tablestore.PrimaryKeyType_STRING: typeString = StringType } return typeString } func ConvertDefinedColumnType(t tablestore.DefinedColumnType) (DefinedColumnTypeString, error) { switch t { case tablestore.DefinedColumn_INTEGER: return DefinedColumnInteger, nil case tablestore.DefinedColumn_STRING: return DefinedColumnString, nil case tablestore.DefinedColumn_BINARY: return DefinedColumnBinary, nil case tablestore.DefinedColumn_DOUBLE: return DefinedColumnDouble, nil case tablestore.DefinedColumn_BOOLEAN: return DefinedColumnBoolean, nil } return "", WrapError(fmt.Errorf("unsupported defined column type: %v", t)) } func FindDefinedColumn(columns []*tablestore.DefinedColumnSchema, target *tablestore.DefinedColumnSchema) ColumnFindResult { for _, column := range columns { if column.Name == target.Name { if column.ColumnType == target.ColumnType { return ExistEqual } return ExistNotEqual } } return NotExist } type ColumnFindResult int32 const ( ExistEqual ColumnFindResult = 1 ExistNotEqual ColumnFindResult = 2 NotExist ColumnFindResult = 3 ) func (s *OtsService) ListOtsInstance(maxResults int) (allInstanceNames []string, err error) { actionPath := "/v2/openapi/listinstances" request := make(map[string]*string) request["RegionId"] = StringPointer(s.client.RegionId) request["MaxResults"] = StringPointer(strconv.Itoa(maxResults)) for { resp, err := OtsRestApiGetWithRetry(s.client, "tablestore", "2020-12-09", actionPath, request) if err != nil { return nil, WrapErrorf(err, DefaultErrorMsg, "alicloud_ots_instances", actionPath, AlibabaCloudSdkGoERROR) } addDebug(actionPath, resp, request) // resp struct: {"_headers": {...}, "body": {...}} respBody, ok := resp["body"].(map[string]interface{}) if !ok { return allInstanceNames, WrapErrorf(errors.New("parse resp body to map[string]interface{} failed"), DefaultErrorMsg, "instance:*", actionPath, AlibabaCloudSdkGoERROR) } // respBody["Instances"] struct: [{}, {}, {}] instanceMaps := respBody["Instances"] // Convert map to json string instancesJSON, err := json.Marshal(instanceMaps) if err != nil { return allInstanceNames, WrapErrorf(err, DefaultErrorMsg, "instance:*", actionPath, AlibabaCloudSdkGoERROR) } // Convert json string to obj var instances []RestOtsInstanceInfo if err := json.Unmarshal(instancesJSON, &instances); err != nil { return allInstanceNames, WrapErrorf(err, DefaultErrorMsg, "instance:*", actionPath, AlibabaCloudSdkGoERROR) } if instances == nil || len(instances) < 1 { break } for _, instance := range instances { allInstanceNames = append(allInstanceNames, instance.InstanceName) } nextToken, _ := resp["NextToken"].(string) if len(instances) < maxResults || nextToken == "" { break } else { request["NextToken"] = &nextToken } } return allInstanceNames, nil } func (s *OtsService) DescribeOtsInstance(instanceName string) (inst RestOtsInstanceInfo, err error) { actionPath := "/v2/openapi/getinstance" request := make(map[string]*string) request["RegionId"] = StringPointer(s.client.RegionId) request["InstanceName"] = StringPointer(instanceName) //client := meta.(*connectivity.AliyunClient) resp, err := OtsRestApiGetWithRetry(s.client, "tablestore", "2020-12-09", actionPath, request) if err != nil { if NotFoundError(err) { return inst, WrapErrorf(err, NotFoundMsg, AlibabaCloudSdkGoERROR) } return inst, WrapErrorf(err, DefaultErrorMsg, instanceName, actionPath, AlibabaCloudSdkGoERROR) } addDebug(actionPath, resp, request) // resp struct: {"_headers": {...}, "body": {...}} instMap := resp["body"] // Convert map to json string instJSON, err := json.Marshal(instMap) if err != nil { return inst, WrapErrorf(err, DefaultErrorMsg, instanceName, actionPath, AlibabaCloudSdkGoERROR) } // Convert json string to obj if err := json.Unmarshal(instJSON, &inst); err != nil { return inst, WrapErrorf(err, DefaultErrorMsg, instanceName, actionPath, AlibabaCloudSdkGoERROR) } if inst.InstanceName != instanceName { return inst, WrapErrorf(NotFoundErr("OtsInstance", instanceName), NotFoundMsg, ProviderERROR) } return inst, nil } func (s *OtsService) DescribeOtsInstanceAttachment(id string) (inst ots.VpcInfo, err error) { request := ots.CreateListVpcInfoByInstanceRequest() request.RegionId = s.client.RegionId request.Method = "GET" request.InstanceName = id raw, err := s.client.WithOtsClient(func(otsClient *ots.Client) (interface{}, error) { return otsClient.ListVpcInfoByInstance(request) }) if err != nil { if NotFoundError(err) { return inst, WrapErrorf(err, NotFoundMsg, AlibabaCloudSdkGoERROR) } return inst, WrapErrorf(err, DefaultErrorMsg, id, request.GetActionName(), AlibabaCloudSdkGoERROR) } addDebug(request.GetActionName(), raw, request.RpcRequest, request) resp, _ := raw.(*ots.ListVpcInfoByInstanceResponse) if resp.TotalCount < 1 { return inst, WrapErrorf(NotFoundErr("OtsInstanceAttachment", id), NotFoundMsg, ProviderERROR) } return resp.VpcInfos.VpcInfo[0], nil } func (s *OtsService) WaitForOtsInstanceVpc(id string, status Status, timeout int) error { deadline := time.Now().Add(time.Duration(timeout) * time.Second) for { object, err := s.DescribeOtsInstanceAttachment(id) if err != nil { if NotFoundError(err) { if status == Deleted { return nil } } else { return WrapError(err) } } if object.InstanceName == id && status != Deleted { return nil } if time.Now().After(deadline) { return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceName, id, ProviderERROR) } } } func (s *OtsService) ListOtsInstanceVpc(id string) (inst []ots.VpcInfo, err error) { request := ots.CreateListVpcInfoByInstanceRequest() request.RegionId = s.client.RegionId request.Method = "GET" request.InstanceName = id raw, err := s.client.WithOtsClient(func(otsClient *ots.Client) (interface{}, error) { return otsClient.ListVpcInfoByInstance(request) }) if err != nil { return inst, WrapErrorf(err, DataDefaultErrorMsg, "alicloud_ots_instance_attachments", request.GetActionName(), AlibabaCloudSdkGoERROR) } addDebug(request.GetActionName(), raw, request.RpcRequest, request) resp, _ := raw.(*ots.ListVpcInfoByInstanceResponse) if resp.TotalCount < 1 { return inst, WrapErrorf(NotFoundErr("OtsInstanceAttachment", id), NotFoundMsg, ProviderERROR) } var retInfos []ots.VpcInfo for _, vpcInfo := range resp.VpcInfos.VpcInfo { vpcInfo.InstanceName = id retInfos = append(retInfos, vpcInfo) } return retInfos, nil } func (s *OtsService) WaitForOtsInstance(id string, instanceInnerStatus string, timeout int) error { deadline := time.Now().Add(time.Duration(timeout) * time.Second) for { instance, err := s.DescribeOtsInstance(id) if err != nil { if NotFoundError(err) { if instanceInnerStatus == string(Deleted) { return nil } } else { return WrapError(err) } } if instance.InstanceStatus == instanceInnerStatus { break } if time.Now().After(deadline) { return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, fmt.Sprint(instance.InstanceStatus), instanceInnerStatus, ProviderERROR) } time.Sleep(DefaultIntervalShort * time.Second) } return nil } func (s *OtsService) DescribeOtsInstanceTypes() (types []string, err error) { request := ots.CreateListClusterTypeRequest() request.Method = requests.GET raw, err := s.client.WithOtsClient(func(otsClient *ots.Client) (interface{}, error) { return otsClient.ListClusterType(request) }) if err != nil { return nil, WrapErrorf(err, DefaultErrorMsg, "alicloud_ots_instance", request.GetActionName(), AlibabaCloudSdkGoERROR) } addDebug(request.GetActionName(), raw, request.RpcRequest, request) resp, _ := raw.(*ots.ListClusterTypeResponse) if resp != nil { return resp.ClusterTypeInfos.ClusterType, nil } return } func isOtsTunnelNotFound(err error) bool { if e, ok := err.(*otsTunnel.TunnelError); ok { if e.Code == otsTunnel.ErrCodeParamInvalid && strings.Contains(e.Message, "tunnel not exist") { return true } if e.Code == otsTunnel.ErrCodePermissionDenied && strings.Contains(e.Message, "Instance not found") { return true } } return false } func (s *OtsService) DescribeOtsTunnel(id string) (resp *otsTunnel.DescribeTunnelResponse, err error) { parts, err := ParseResourceId(id, 3) if err != nil { return nil, WrapError(err) } instanceName, tableName, tunnelName := parts[0], parts[1], parts[2] request := new(otsTunnel.DescribeTunnelRequest) request.TableName = tableName request.TunnelName = tunnelName var raw interface{} var requestInfo otsTunnel.TunnelClient err = resource.Retry(5*time.Minute, func() *resource.RetryError { raw, err = s.client.WithTableStoreTunnelClient(instanceName, func(tunnelClient otsTunnel.TunnelClient) (interface{}, error) { requestInfo = tunnelClient return tunnelClient.DescribeTunnel(request) }) if err != nil { if IsExpectedErrors(err, OtsTunnelIsTemporarilyUnavailable) { return resource.RetryableError(err) } return resource.NonRetryableError(err) } resp, _ := raw.(*otsTunnel.DescribeTunnelResponse) if resp != nil && resp.Tunnel != nil && resp.Tunnel.Stage == "InitBaseDataAndStreamShard" { return resource.RetryableError(WrapError(Error("ots tunnel is initial"))) } addDebug("DescribeTunnel", raw, requestInfo, request) return nil }) if err != nil { if isOtsTunnelNotFound(err) { return nil, WrapErrorf(err, NotFoundMsg, AliyunTablestoreGoSdk) } return nil, WrapErrorf(err, DefaultErrorMsg, id, "DescribeTunnel", AliyunTablestoreGoSdk) } resp, _ = raw.(*otsTunnel.DescribeTunnelResponse) if resp == nil || resp.Tunnel == nil || resp.Tunnel.TableName != tableName || resp.Tunnel.TunnelName != tunnelName { return nil, WrapErrorf(NotFoundErr("OtsTunnel", id), NotFoundMsg, ProviderERROR) } return resp, nil } func (s *OtsService) ListOtsTunnels(instanceName string, tableName string) (resp *otsTunnel.ListTunnelResponse, err error) { // check table exists id := ID(instanceName, tableName) if _, err := s.DescribeOtsTable(id); err != nil { return nil, WrapError(err) } var raw interface{} var requestInfo otsTunnel.TunnelClient request := new(otsTunnel.ListTunnelRequest) request.TableName = tableName err = resource.Retry(5*time.Minute, func() *resource.RetryError { raw, err = s.client.WithTableStoreTunnelClient(instanceName, func(tunnelClient otsTunnel.TunnelClient) (interface{}, error) { requestInfo = tunnelClient return tunnelClient.ListTunnel(request) }) if err != nil { if IsExpectedErrors(err, OtsTunnelIsTemporarilyUnavailable) { return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug("ListTunnel", raw, requestInfo, request) return nil }) if err != nil { if isOtsTunnelNotFound(err) { return nil, WrapErrorf(err, NotFoundMsg, AliyunTablestoreGoSdk) } return nil, WrapErrorf(err, DefaultErrorMsg, id, "DescribeTunnel", AliyunTablestoreGoSdk) } resp, _ = raw.(*otsTunnel.ListTunnelResponse) if resp == nil { return nil, WrapErrorf(NotFoundErr("OtsTunnel", id), NotFoundMsg, ProviderERROR) } return resp, nil } func (s *OtsService) WaitForOtsTunnel(id string, status Status, timeout int) error { parts, err := ParseResourceId(id, 3) if err != nil { return WrapError(err) } deadline := time.Now().Add(time.Duration(timeout) * time.Second) for { object, err := s.DescribeOtsTunnel(id) if err != nil { if NotFoundError(err) { if status == Deleted { return nil } } else { return WrapError(err) } } if object.Tunnel.TunnelName == parts[2] && status != Deleted { return nil } if time.Now().After(deadline) { return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.Tunnel.TunnelName, parts[2], ProviderERROR) } } } func ID(segName ...string) string { return strings.Join(segName, COLON_SEPARATED) } func (s *OtsService) ListOtsSecondaryIndex(instanceName string, tableName string) ([]*tablestore.IndexMeta, error) { tableResp, err := s.DescribeOtsTable(ID(instanceName, tableName)) if err != nil { return nil, WrapError(err) } if tableResp == nil { return nil, nil } return tableResp.IndexMetas, nil } // DescribeOtsSecondaryIndex The describe method is depended on by AccTest, // and the second return value of the describe method needs `error` type func (s *OtsService) DescribeOtsSecondaryIndex(id string) (index *TableIndex, err error) { instanceName, tableName, indexName, _, err := ParseIndexId(id) if err != nil { return } tableResp, err := s.DescribeOtsTable(ID(instanceName, tableName)) if err != nil { return } if tableResp == nil { err = WrapError(fmt.Errorf("table not exist: %s", tableName)) return } for _, idx := range tableResp.IndexMetas { if idx.IndexName == indexName { index = &TableIndex{ InstanceName: instanceName, TableName: tableName, Index: idx, } return } } err = WrapError(fmt.Errorf("index not exist: %s.%s", tableName, indexName)) return } type TableIndex struct { InstanceName string TableName string Index *tablestore.IndexMeta } func ConvertSecIndexType(indexType tablestore.IndexType) (SecondaryIndexTypeString, error) { switch indexType { case tablestore.IT_GLOBAL_INDEX: return Global, nil case tablestore.IT_LOCAL_INDEX: return Local, nil default: return "", WrapError(fmt.Errorf("unexpected secondary index type: %v", indexType)) } } func ConvertSecIndexTypeString(typeStr SecondaryIndexTypeString) (tablestore.IndexType, error) { switch typeStr { case Global: return tablestore.IT_GLOBAL_INDEX, nil case Local: return tablestore.IT_LOCAL_INDEX, nil default: return 0, WrapError(fmt.Errorf("unexpected secondary index type: %v", typeStr)) } } type RegxFilter struct { regx *regexp.Regexp getSourceValue func(sourceObj interface{}) interface{} } func (f *RegxFilter) filter(sourceObj interface{}) bool { return f.regx.MatchString(f.getSourceValue(sourceObj).(string)) } type ValuesFilter struct { allowedValues []interface{} getSourceValue func(sourceObj interface{}) interface{} } func (f *ValuesFilter) filter(sourceObj interface{}) bool { for _, allowed := range f.allowedValues { if allowed != nil && allowed == f.getSourceValue(sourceObj) { return true } } // source value not in the enumerated values return false } type DataFilter interface { filter(sourceObj interface{}) bool } type InputDataSource struct { inputs []interface{} filters []DataFilter } func (ds *InputDataSource) doFilters() []interface{} { var outputs []interface{} for _, input := range ds.inputs { pass := true for _, filter := range ds.filters { if !filter.filter(input) { pass = false break } } if pass { outputs = append(outputs, input) } } return outputs } func (s *OtsService) LoopWaitTable(instanceName string, tableName string) (table *tablestore.DescribeTableResponse, err error) { err = resource.Retry(1*time.Minute, func() *resource.RetryError { t, e := s.DescribeOtsTable(ID(instanceName, tableName)) if e != nil { if NotFoundError(e) { return resource.RetryableError(e) } return resource.NonRetryableError(e) } table = t return nil }) return } func (s *OtsService) WaitForSecondaryIndex(instance string, table string, index string, status Status, timeout int) error { deadline := time.Now().Add(time.Duration(timeout) * time.Second) id := ID(instance, table) for { tableResp, err := s.DescribeOtsTable(id) if err != nil { if NotFoundError(err) { if status == Deleted { return nil } } } // table exists and index does not exist indexFind := IsSubCollection([]string{index}, simplifySecIndex(tableResp.IndexMetas)) switch { case status == Deleted, !indexFind: return nil case status != Deleted, indexFind: // Non-deleted states cannot be distinguished precisely. If the index exists, // it is considered that the index successfully matches the non-deleted state, and end waiting return nil default: break } if time.Now().After(deadline) { return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, tableResp.TableMeta.TableName, index, ProviderERROR) } } } // ParseIndexId both secondary index IDs and search index IDs will use this method, they consist of the same fields func ParseIndexId(indexId string) (instanceName, tableName, indexName, indexTypeStr string, err error) { splits := strings.Split(indexId, COLON_SEPARATED) if len(splits) >= 4 { instanceName = splits[0] tableName = splits[1] indexName = splits[2] indexTypeStr = splits[3] } else { err = WrapError(fmt.Errorf("invalid index id(instanceName:tableName:indexName:indexType): %s", indexId)) } return } func (s *OtsService) ListOtsSearchIndex(instanceName string, tableName string) (indexes []*tablestore.IndexInfo, err error) { // check table exists id := ID(instanceName, tableName) if _, err := s.DescribeOtsTable(id); err != nil { return nil, WrapError(err) } req := &tablestore.ListSearchIndexRequest{ TableName: tableName, } var raw interface{} var reqClient *tablestore.TableStoreClient err = resource.Retry(2*time.Minute, func() *resource.RetryError { raw, err = s.client.WithTableStoreClient(instanceName, func(tableStoreClient *tablestore.TableStoreClient) (interface{}, error) { reqClient = tableStoreClient return tableStoreClient.ListSearchIndex(req) }) if err != nil { if IsExpectedErrors(err, OtsSearchIndexIsTemporarilyUnavailable) { return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug("ListSearchIndex", raw, reqClient, req) return nil }) if err != nil { if strings.HasPrefix(err.Error(), "OTSObjectNotExist") { return nil, WrapErrorf(err, NotFoundMsg, AliyunTablestoreGoSdk) } return nil, WrapErrorf(err, DefaultErrorMsg, id, "ListOtsSearchIndex", AliyunTablestoreGoSdk) } resp, _ := raw.(*tablestore.ListSearchIndexResponse) if resp == nil { return nil, WrapErrorf(NotFoundErr("SearchIndex", id), NotFoundMsg, ProviderERROR) } // IndexInfo slice can be nil when table not has search index return resp.IndexInfo, nil } func (s *OtsService) DescribeOtsSearchIndex(id string) (indexResp *tablestore.DescribeSearchIndexResponse, err error) { instanceName, tableName, indexName, _, err := ParseIndexId(id) if err != nil { return nil, WrapError(err) } if _, err = s.DescribeOtsInstance(instanceName); err != nil { return nil, WrapError(err) } if _, err := s.DescribeOtsTable(ID(instanceName, tableName)); err != nil { if NotFoundError(err) { return nil, WrapError(err) } } req := &tablestore.DescribeSearchIndexRequest{ TableName: tableName, IndexName: indexName, } var raw interface{} var reqClient *tablestore.TableStoreClient err = resource.Retry(2*time.Minute, func() *resource.RetryError { raw, err = s.client.WithTableStoreClient(instanceName, func(tableStoreClient *tablestore.TableStoreClient) (interface{}, error) { reqClient = tableStoreClient return tableStoreClient.DescribeSearchIndex(req) }) defer func() { addDebug("DescribeSearchIndex", raw, reqClient, req) }() if err != nil { if IsExpectedErrors(err, OtsSearchIndexIsTemporarilyUnavailable) { return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) if err != nil { if strings.HasPrefix(err.Error(), "OTSObjectNotExist") { return nil, WrapErrorf(err, NotFoundMsg, AliyunTablestoreGoSdk) } return nil, WrapErrorf(err, DefaultErrorMsg, id, "DescribeSearchIndex", AliyunTablestoreGoSdk) } indexResp, _ = raw.(*tablestore.DescribeSearchIndexResponse) if indexResp == nil || indexResp.SyncStat == nil || indexResp.Schema == nil { return nil, WrapErrorf(NotFoundErr("OtsSearchIndex", id), NotFoundMsg, ProviderERROR) } return indexResp, nil } func ConvertSearchIndexSyncPhase(syncPhase tablestore.SyncPhase) (OtsSearchIndexSyncPhaseString, error) { switch syncPhase { case tablestore.SyncPhase_FULL: return Full, nil case tablestore.SyncPhase_INCR: return Incr, nil default: return "", WrapError(fmt.Errorf("unexpected search index sync phase: %v", syncPhase)) } } func ConvertSearchIndexFieldTypeString(typeStr SearchIndexFieldTypeString) (tablestore.FieldType, error) { switch typeStr { case "Long": return tablestore.FieldType_LONG, nil case "Double": return tablestore.FieldType_DOUBLE, nil case "Boolean": return tablestore.FieldType_BOOLEAN, nil case "Keyword": return tablestore.FieldType_KEYWORD, nil case "Text": return tablestore.FieldType_TEXT, nil case "Date": return tablestore.FieldType_DATE, nil case "GeoPoint": return tablestore.FieldType_GEO_POINT, nil case "Nested": return tablestore.FieldType_NESTED, nil default: return 0, WrapError(fmt.Errorf("unexpected search index field type string: %s", typeStr)) } } func ConvertSearchIndexAnalyzerTypeString(typeStr SearchIndexAnalyzerTypeString) (tablestore.Analyzer, error) { switch typeStr { case "SingleWord": return tablestore.Analyzer_SingleWord, nil case "Split": return tablestore.Analyzer_Split, nil case "MinWord": return tablestore.Analyzer_MinWord, nil case "MaxWord": return tablestore.Analyzer_MaxWord, nil case "Fuzzy": return tablestore.Analyzer_Fuzzy, nil default: return "", WrapError(fmt.Errorf("unexpected search index analyzer type string: %s", typeStr)) } } func ConvertSearchIndexSortFieldTypeString(typeStr SearchIndexSortFieldTypeString) (search.Sorter, error) { switch typeStr { case "PrimaryKeySort": return &search.PrimaryKeySort{}, nil case "FieldSort": return &search.FieldSort{}, nil default: return nil, WrapError(fmt.Errorf("unexpected search index sort field type string [PrimaryKeySort|FieldSort]: %s", typeStr)) } } func ConvertSearchIndexOrderTypeString(typeStr SearchIndexOrderTypeString) (search.SortOrder, error) { switch typeStr { case "Asc": return search.SortOrder_ASC, nil case "Desc": return search.SortOrder_DESC, nil default: return 0, WrapError(fmt.Errorf("unexpected search index sort order string [Asc|Desc]: %s", typeStr)) } } func ConvertSearchIndexSortModeString(typeStr SearchIndexSortModeString) (search.SortMode, error) { switch typeStr { case "Min": return search.SortMode_Min, nil case "Max": return search.SortMode_Max, nil case "Avg": return search.SortMode_Avg, nil default: return 0, WrapError(fmt.Errorf("unexpected search index sort mode string [Min|Max|Avg]: %s", typeStr)) } } func (s *OtsService) WaitForSearchIndex(instance string, table string, indexName string, status Status, timeout int) error { deadline := time.Now().Add(time.Duration(timeout) * time.Second) id := ID(instance, table, indexName, SearchIndexTypeHolder) for { index, err := s.DescribeOtsSearchIndex(id) if err != nil { if NotFoundError(err) { if status == Deleted { return nil } } else { return WrapError(err) } } if index.SyncStat != nil && status != Deleted { return nil } if time.Now().After(deadline) { return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, table, indexName, ProviderERROR) } } } func (s *OtsService) DeleteSearchIndex(instanceName string, tableName string, indexName string) error { request := &tablestore.DeleteSearchIndexRequest{ TableName: tableName, IndexName: indexName, } err := resource.Retry(2*time.Minute, func() *resource.RetryError { var requestCli *tablestore.TableStoreClient raw, err := s.client.WithTableStoreClient(instanceName, func(tableStoreClient *tablestore.TableStoreClient) (interface{}, error) { requestCli = tableStoreClient return tableStoreClient.DeleteSearchIndex(request) }) defer func() { addDebug("DeleteSearchIndex", raw, requestCli, request) }() if err != nil { if IsExpectedErrors(err, OtsTableIsTemporarilyUnavailable) { return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) return err } // OtsRestApiPostWithRetry send POST request by CommonSDK(roa/restful) with retry. // This method directly passes OpenAPI parameters such as product and version, without relying on SDK version upgrades. // Retry policy: 3, 3+5, 3+5+5…, retry timeout: d.Timeout(schema.TimeoutCreate) // product is openapi product code, version is openapi version, actionPath is restful openapi backend api path, requestBody is request body content func OtsRestApiPostWithRetry(client *connectivity.AliyunClient, product string, version string, actionPath string, requestBody map[string]interface{}) (map[string]interface{}, error) { return invokeOtsRestApiWithRetry(client, product, version, actionPath, "POST", nil, nil, requestBody) } // OtsRestApiGetWithRetry send GET request by CommonSDK(roa/restful) with retry. // This method directly passes OpenAPI parameters such as product and version, without relying on SDK version upgrades. // Retry policy: 3, 3+5, 3+5+5…, retry timeout: d.Timeout(schema.TimeoutCreate) // product is openapi product code, version is openapi version, actionPath is restful openapi backend api path, urlQuery is url param func OtsRestApiGetWithRetry(client *connectivity.AliyunClient, product string, version string, actionPath string, urlQuery map[string]*string) (map[string]interface{}, error) { return invokeOtsRestApiWithRetry(client, product, version, actionPath, "GET", urlQuery, nil, nil) } func invokeOtsRestApiWithRetry(client *connectivity.AliyunClient, product string, version string, actionPath string, httpMethod string, urlQuery map[string]*string, headers map[string]*string, requestBody map[string]interface{}) (map[string]interface{}, error) { var response map[string]interface{} otsClient, err := client.NewOtsRoaClient(product) if err != nil { return nil, WrapError(err) } wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(20*time.Minute, func() *resource.RetryError { response, err = otsClient.DoRequest(StringPointer(version), nil, StringPointer(httpMethod), StringPointer("AK"), StringPointer(actionPath), urlQuery, headers, requestBody, &util.RuntimeOptions{}) if err != nil { if IsExpectedErrors(err, OtsTableIsTemporarilyUnavailable) || IsExpectedErrors(err, OtsTunnelIsTemporarilyUnavailable) || IsExpectedErrors(err, OtsSecondaryIndexIsTemporarilyUnavailable) || IsExpectedErrors(err, OtsSearchIndexIsTemporarilyUnavailable) || NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug(actionPath, response, requestBody) return nil }) if err != nil { return nil, WrapErrorf(err, DefaultErrorMsg, product, actionPath, AlibabaCloudSdkGoERROR) } return response, nil } // ACLString2Slice aclPattern: A,B,C func ACLString2Slice(aclStr string) (s []string) { if aclStr == "" { return s } return strings.Split(aclStr, ",") }