alicloud/resource_alicloud_oss_bucket_replication.go (513 lines of code) (raw):
package alicloud
import (
"encoding/xml"
"fmt"
"log"
"net/http"
"time"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
)
func resourceAlicloudOssBucketReplication() *schema.Resource {
return &schema.Resource{
Create: resourceAlicloudOssBucketReplicationCreate,
Read: resourceAlicloudOssBucketReplicationRead,
Delete: resourceAlicloudOssBucketReplicationDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Timeouts: &schema.ResourceTimeout{
Delete: schema.DefaultTimeout(30 * time.Minute),
},
Schema: map[string]*schema.Schema{
"bucket": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"rule_id": {
Type: schema.TypeString,
Computed: true,
},
"status": {
Type: schema.TypeString,
Computed: true,
},
"prefix_set": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"prefixes": {
Type: schema.TypeList,
Required: true,
ForceNew: true,
MaxItems: 10,
Elem: &schema.Schema{Type: schema.TypeString, ValidateFunc: validation.StringLenBetween(0, 1023)},
},
},
},
},
"action": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"destination": {
Type: schema.TypeList,
Required: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"location": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"transfer_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
},
},
},
"historical_object_replication": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
"enabled",
"disabled",
}, false),
},
"sync_role": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"source_selection_criteria": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"sse_kms_encrypted_objects": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"status": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
"Enabled",
"Disabled",
}, false),
},
},
},
},
},
},
},
"encryption_configuration": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"replica_kms_key_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
},
},
"progress": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"historical_object": {
Type: schema.TypeString,
Computed: true,
},
"new_object": {
Type: schema.TypeString,
Computed: true,
},
},
},
},
},
}
}
type PrefixSetType struct {
Prefixes []string `xml:"Prefix"`
}
type DestinationType struct {
Bucket string `xml:"Bucket"`
Location string `xml:"Location"`
TransferType string `xml:"TransferType,omitempty"`
}
type SseKmsEncryptedObjectsType struct {
Status string `xml:"Status"`
}
type SourceSelectionCriteriaType struct {
SseKmsEncryptedObjects *SseKmsEncryptedObjectsType `xml:"SseKmsEncryptedObjects,omitempty"`
}
type EncryptionConfigurationType struct {
ReplicaKmsKeyID string `xml:"ReplicaKmsKeyID,omitempty"`
}
type ReplicationRule struct {
ID string `xml:"ID,omitempty"`
Action string `xml:"Action,omitempty"`
PrefixSet *PrefixSetType `xml:"PrefixSet,omitempty"`
Destination *DestinationType `xml:"Destination"`
HistoricalObjectReplication string `xml:"HistoricalObjectReplication,omitempty"`
Status string `xml:"Status,omitempty"`
SyncRole string `xml:"SyncRole,omitempty"`
SourceSelectionCriteria *SourceSelectionCriteriaType `xml:"SourceSelectionCriteria,omitempty"`
EncryptionConfiguration *EncryptionConfigurationType `xml:"EncryptionConfiguration,omitempty"`
}
type ReplicationConfiguration struct {
XMLName xml.Name `xml:"ReplicationConfiguration"`
Rules []ReplicationRule `xml:"Rule"`
}
type ReplicationProgress struct {
XMLName xml.Name `xml:"ReplicationProgress"`
ID string `xml:"Rule>ID"`
HistoricalObject string `xml:"Rule>Progress>HistoricalObject"`
NewObject string `xml:"Rule>Progress>NewObject"`
}
func expandReplicationRule(d *schema.ResourceData) ReplicationRule {
r := d.Get("").(map[string]interface{})
rule := ReplicationRule{}
// ID
if val, ok := r["rule_id"].(string); ok && val != "" {
rule.ID = val
}
// Action
if val, ok := r["action"].(string); ok && val != "" {
rule.Action = val
}
// Status
if val, ok := r["status"].(string); ok && val != "" {
rule.Status = val
}
// HistoricalObjectReplication
if val, ok := r["historical_object_replication"].(string); ok && val != "" {
rule.HistoricalObjectReplication = val
}
// SyncRole
if val, ok := r["sync_role"].(string); ok && val != "" {
rule.SyncRole = val
}
// PrefixSet
if val, ok := r["prefix_set"].([]interface{}); ok && len(val) > 0 && val[0] != nil {
e := val[0].(map[string]interface{})
i := PrefixSetType{}
if v, ok := e["prefixes"]; ok {
var prefixes []string
for _, prefix := range v.([]interface{}) {
prefixes = append(prefixes, prefix.(string))
}
i.Prefixes = prefixes
}
rule.PrefixSet = &i
}
// Destination
if val, ok := r["destination"].([]interface{}); ok && len(val) > 0 && val[0] != nil {
e := val[0].(map[string]interface{})
i := DestinationType{}
// Bucket
if val, ok := e["bucket"].(string); ok && val != "" {
i.Bucket = val
}
// Location
if val, ok := e["location"].(string); ok && val != "" {
i.Location = val
}
// TransferType
if val, ok := e["transfer_type"].(string); ok && val != "" {
i.TransferType = val
}
rule.Destination = &i
}
// SourceSelectionCriteria
if val, ok := r["source_selection_criteria"].([]interface{}); ok && len(val) > 0 && val[0] != nil {
e := val[0].(map[string]interface{})
i := SourceSelectionCriteriaType{}
//SseKmsEncryptedObjects
if val1, ok := e["sse_kms_encrypted_objects"].([]interface{}); ok && len(val1) > 0 && val1[0] != nil {
s := val1[0].(map[string]interface{})
j := SseKmsEncryptedObjectsType{}
if v, ok := s["status"].(string); ok && v != "" {
j.Status = v
}
i.SseKmsEncryptedObjects = &j
}
rule.SourceSelectionCriteria = &i
}
// EncryptionConfiguration
if val, ok := r["encryption_configuration"].([]interface{}); ok && len(val) > 0 && val[0] != nil {
e := val[0].(map[string]interface{})
i := EncryptionConfigurationType{}
// ReplicaKmsKeyID
if val, ok := e["replica_kms_key_id"].(string); ok && val != "" {
i.ReplicaKmsKeyID = val
}
rule.EncryptionConfiguration = &i
}
return rule
}
func flattenReplicationRule(d *schema.ResourceData, rc *ReplicationConfiguration, rp map[string]interface{}, ruleId string) error {
rule := make(map[string]interface{})
for _, r := range rc.Rules {
if r.ID != ruleId {
continue
}
// ID
if r.ID != "" {
rule["rule_id"] = r.ID
}
// Action
if r.Action != "" {
rule["action"] = r.Action
}
// Status
if r.Status != "" {
rule["status"] = r.Status
}
// HistoricalObjectReplication
if r.HistoricalObjectReplication != "" {
rule["historical_object_replication"] = r.HistoricalObjectReplication
}
// SyncRole
if r.SyncRole != "" {
rule["sync_role"] = r.SyncRole
}
// PrefixSet
if r.PrefixSet != nil {
prefixSet := make(map[string]interface{})
// prefixes
if len(r.PrefixSet.Prefixes) != 0 {
prefixSet["prefixes"] = r.PrefixSet.Prefixes
}
rule["prefix_set"] = []interface{}{prefixSet}
}
// Destination
if r.Destination != nil {
destination := make(map[string]interface{})
// Bucket
if r.Destination.Bucket != "" {
destination["bucket"] = r.Destination.Bucket
}
// Location
if r.Destination.Location != "" {
destination["location"] = r.Destination.Location
}
// TransferType
if r.Destination.TransferType != "" {
destination["transfer_type"] = r.Destination.TransferType
}
rule["destination"] = []interface{}{destination}
}
// SourceSelectionCriteria
if r.SourceSelectionCriteria != nil {
sourceSelectionCriteria := make(map[string]interface{})
if r.SourceSelectionCriteria.SseKmsEncryptedObjects != nil {
sseKmsEncryptedObjects := make(map[string]interface{})
if r.SourceSelectionCriteria.SseKmsEncryptedObjects.Status != "" {
sseKmsEncryptedObjects["status"] = r.SourceSelectionCriteria.SseKmsEncryptedObjects.Status
}
sourceSelectionCriteria["sse_kms_encrypted_objects"] = []interface{}{sseKmsEncryptedObjects}
}
rule["source_selection_criteria"] = []interface{}{sourceSelectionCriteria}
}
// EncryptionConfiguration
if r.EncryptionConfiguration != nil {
encryptionConfiguration := make(map[string]interface{})
if r.EncryptionConfiguration.ReplicaKmsKeyID != "" {
encryptionConfiguration["replica_kms_key_id"] = r.EncryptionConfiguration.ReplicaKmsKeyID
}
rule["encryption_configuration"] = []interface{}{encryptionConfiguration}
}
//Progress
if rp != nil {
if val, ok := rp[r.ID].(*ReplicationProgress); ok && val != nil {
progress := make(map[string]interface{})
if val.HistoricalObject != "" {
progress["historical_object"] = val.HistoricalObject
}
if val.NewObject != "" {
progress["new_object"] = val.NewObject
}
rule["progress"] = []interface{}{progress}
}
}
}
for k, v := range rule {
if err := d.Set(k, v); err != nil {
return err
}
}
return nil
}
func retrieveReplicationRules(client *connectivity.AliyunClient, bucket string) (*ReplicationConfiguration, error) {
// Read the replication configuration
var requestInfo *oss.Client
raw, err := client.WithOssClient(func(ossClient *oss.Client) (interface{}, error) {
requestInfo = ossClient
return ossClient.GetBucketReplication(bucket)
})
if err != nil {
return nil, err
}
addDebug("GetBucketReplication", raw, requestInfo, map[string]interface{}{
"bucketName": bucket,
})
var rc ReplicationConfiguration
var body []byte = []byte(raw.(string))
if err := xml.Unmarshal(body, &rc); err != nil {
return nil, err
}
return &rc, nil
}
func isReplicationRuleExist(client *connectivity.AliyunClient, bucket string, ruleId string) (bool, error) {
// Read the replication Progress by rule id
_, err := client.WithOssClient(func(ossClient *oss.Client) (interface{}, error) {
return ossClient.GetBucketReplicationProgress(bucket, ruleId)
})
if err == nil {
return true, nil
}
if IsExpectedErrors(err, []string{"NoSuchReplicationRule", "NoSuchBucket"}) {
return false, nil
}
return false, err
}
func retrieveReplicationRuleProgress(client *connectivity.AliyunClient, bucket string, ruleId string) (ReplicationProgress, error) {
var rp ReplicationProgress
var requestInfo *oss.Client
// Read the replication Progress by rule id
raw, err := client.WithOssClient(func(ossClient *oss.Client) (interface{}, error) {
requestInfo = ossClient
return ossClient.GetBucketReplicationProgress(bucket, ruleId)
})
if err != nil {
return rp, err
}
addDebug("GetBucketReplicationProgress", raw, requestInfo, map[string]interface{}{
"bucketName": bucket,
"ruleId": ruleId,
})
var body []byte = []byte(raw.(string))
if err := xml.Unmarshal(body, &rp); err != nil {
return rp, WrapErrorf(err, DefaultErrorMsg, bucket, "Unmarshal XML", AliyunOssGoSdk)
}
return rp, err
}
func hasProgressBlock(d *schema.ResourceData) bool {
return true
}
func resourceAlicloudOssBucketReplicationCreate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*connectivity.AliyunClient)
bucket := d.Get("bucket").(string)
rules := make([]ReplicationRule, 0)
rule := expandReplicationRule(d)
rc := &ReplicationConfiguration{
Rules: append(rules, rule),
}
bs, err := xml.Marshal(rc)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, bucket, "Marshal to XML", AliyunOssGoSdk)
}
var xmlString string = string(bs[:])
var requestInfo *oss.Client
var replicationAlreadyExist bool
var respHeader http.Header
replicationAlreadyExist = false
raw, err := client.WithOssClient(func(ossClient *oss.Client) (interface{}, error) {
requestInfo = ossClient
return nil, ossClient.PutBucketReplication(bucket, xmlString, oss.GetResponseHeader(&respHeader))
})
if err != nil {
if !IsExpectedErrors(err, []string{"BucketReplicationAlreadyExist"}) {
return WrapErrorf(err, DefaultErrorMsg, bucket, "PutBucketReplication", AliyunOssGoSdk)
}
replicationAlreadyExist = true
}
addDebug("PutBucketReplication", raw, requestInfo, map[string]interface{}{
"bucketName": bucket,
"ReplicationConfiguration": xmlString,
"ReplicationAlreadyExist": replicationAlreadyExist,
})
var ruleId string
if respHeader != nil {
ruleId = respHeader.Get("x-oss-replication-rule-id")
} else {
ruleId = ""
}
if len(ruleId) == 0 {
//OSS server does not return rule-id and only supports one rule currently, obtains rule id through GetBucketReplication
rc, err = retrieveReplicationRules(client, bucket)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, bucket, "retrieveReplicationRules", AliyunOssGoSdk)
}
ruleId = rc.Rules[0].ID
}
d.SetId(fmt.Sprintf("%s%s%s", bucket, COLON_SEPARATED, ruleId))
return resourceAlicloudOssBucketReplicationRead(d, meta)
}
func resourceAlicloudOssBucketReplicationRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*connectivity.AliyunClient)
parts, err := ParseResourceId(d.Id(), 2)
if err != nil {
return WrapError(err)
}
bucket := parts[0]
ruleId := parts[1]
// Read the replication configuration
rc, err := retrieveReplicationRules(client, bucket)
if IsExpectedErrors(err, []string{"NoSuchReplicationConfiguration", "NoSuchBucket"}) {
log.Printf("[WARN] OSS Bucket Replication Configuration (%s) not found, removing from state", bucket)
d.SetId("")
return nil
}
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, bucket, "retrieveReplicationRules", AliyunOssGoSdk)
}
//Read the replication progress by rule id
rule_progress := make(map[string]interface{})
if hasProgressBlock(d) {
if val, err := retrieveReplicationRuleProgress(client, bucket, ruleId); err == nil {
rule_progress[ruleId] = &val
}
}
// Update to resouce data
d.Set("bucket", bucket)
err = flattenReplicationRule(d, rc, rule_progress, ruleId)
if err != nil {
return WrapError(err)
}
return nil
}
func resourceAlicloudOssBucketReplicationDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*connectivity.AliyunClient)
parts, err := ParseResourceId(d.Id(), 2)
if err != nil {
return WrapError(err)
}
bucket := parts[0]
ruleId := parts[1]
var requestInfo *oss.Client
raw, err := client.WithOssClient(func(ossClient *oss.Client) (interface{}, error) {
requestInfo = ossClient
return nil, ossClient.DeleteBucketReplication(bucket, ruleId)
})
if IsExpectedErrors(err, []string{"NoSuchReplicationConfiguration", "NoSuchBucket"}) {
return nil
}
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, bucket, "DeleteBucketReplication", AliyunOssGoSdk)
}
addDebug("DeleteBucketReplication", raw, requestInfo, map[string]interface{}{
"bucketName": bucket,
"ruleId": ruleId,
})
// wait until the replication configuration is closed
_ = resource.Retry(d.Timeout(schema.TimeoutDelete), func() *resource.RetryError {
raw, _ := isReplicationRuleExist(client, bucket, ruleId)
if raw {
time.Sleep(time.Duration(10) * time.Second)
return resource.RetryableError(Error("in closing status"))
}
return nil
})
return nil
}