datasource/mongo/sync/sync.go (73 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sync
import (
"context"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/cari/sync"
)
const (
CollectionTask = "task"
CollectionTombstone = "tombstone"
)
type Options struct {
ResourceID string
Opts map[string]string
}
type Option func(options *Options)
func NewSyncOption() Options {
return Options{}
}
func WithResourceID(resourceID string) Option {
return func(options *Options) {
options.ResourceID = resourceID
}
}
func WithOpts(opts map[string]string) Option {
return func(options *Options) {
options.Opts = opts
}
}
func DoCreateOpts(ctx context.Context, resourceType string, resource interface{}, options ...Option) error {
return doOpts(ctx, sync.CreateAction, resourceType, resource, options...)
}
func DoUpdateOpts(ctx context.Context, resourceType string, resource interface{}, options ...Option) error {
return doOpts(ctx, sync.UpdateAction, resourceType, resource, options...)
}
func DoDeleteOpts(ctx context.Context, resourceType, resourceID string, resource interface{}, options ...Option) error {
options = append(options, WithResourceID(resourceID))
return doOpts(ctx, sync.DeleteAction, resourceType, resource, options...)
}
func doOpts(ctx context.Context, action string, resourceType string, resource interface{}, options ...Option) error {
if !util.EnableSync(ctx) {
return nil
}
syncOpts := NewSyncOption()
for _, option := range options {
option(&syncOpts)
}
err := doTaskOpt(ctx, action, resourceType, resource, &syncOpts)
if err != nil || action != sync.DeleteAction {
return err
}
return doTombstoneOpt(ctx, resourceType, syncOpts.ResourceID)
}
func doTaskOpt(ctx context.Context, action string, resourceType string, resource interface{}, syncOpts *Options) error {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
task, err := sync.NewTask(domain, project, action, resourceType, resource)
if err != nil {
return err
}
if syncOpts != nil {
task.Opts = syncOpts.Opts
}
_, err = mongo.GetClient().GetDB().Collection(CollectionTask).InsertOne(ctx, task)
return err
}
func doTombstoneOpt(ctx context.Context, resourceType, resourceID string) error {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
tombstone := sync.NewTombstone(domain, project, resourceType, resourceID)
_, err := mongo.GetClient().GetDB().Collection(CollectionTombstone).InsertOne(ctx, tombstone)
return err
}