common/change_stream.go (160 lines of code) (raw):
package utils
import (
"context"
"fmt"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
"sort"
)
const (
contextTimeout = 60 * time.Second
changeStreamTimeout = 24 // hours
)
type ChangeStreamConn struct {
Client *mongo.Client
CsHandler *mongo.ChangeStream
Ops *options.ChangeStreamOptions
ctx context.Context
conn *MongoCommunityConn
}
func NewChangeStreamConn(src string,
mode string,
fullDoc bool,
specialDb string,
filterFunc func(name string) bool,
watchStartTime interface{},
batchSize int32,
sourceDbversion string,
sslRootFile string) (*ChangeStreamConn, error) {
conn, err := NewMongoCommunityConn(src, mode, true, ReadWriteConcernMajority, "", sslRootFile)
if err != nil {
return nil, fmt.Errorf("NewChangeStreamConn source[%v %v] build connection failed: %v",
src, mode, err)
}
waitTime := time.Duration(changeStreamTimeout * time.Hour) // hours
ops := &options.ChangeStreamOptions{
MaxAwaitTime: &waitTime,
BatchSize: &batchSize,
}
if watchStartTime != nil {
if val, ok := watchStartTime.(int64); ok {
if (val >> 32) > 1 {
startTime := &primitive.Timestamp{
T: uint32(val >> 32),
I: uint32(val & Int32max),
}
ops.SetStartAtOperationTime(startTime)
}
} else {
// ResumeToken,sourceDbversion >= 4.2 use StartAfter, < 4.2 use ResumeAfter
if val_ver, _ := GetAndCompareVersion(nil, "4.2.0", sourceDbversion); val_ver {
ops.SetStartAfter(watchStartTime)
} else {
ops.SetResumeAfter(watchStartTime)
}
}
}
if fullDoc {
ops.SetFullDocument(options.UpdateLookup)
}
var csHandler *mongo.ChangeStream
if specialDb == VarSpecialSourceDBFlagAliyunServerless {
_, dbs, err := GetDbNamespace(src, filterFunc, sslRootFile)
if err != nil {
return nil, fmt.Errorf("GetDbNamespace failed: %v", err)
}
dbList := make([]string, 0, len(dbs))
for name := range dbs {
dbList = append(dbList, name)
}
sort.Strings(dbList)
if len(dbList) == 0 {
return nil, fmt.Errorf("db list is empty")
}
// TODO(jianyou) deprecate aliyun_serverless
//ops.SetMultiDbSelections("(" + strings.Join(dbList, "|") + ")")
LOG.Info("change stream options with aliyun_serverless: %v", printCsOption(ops))
// csHandler, err = client.Database("non-exist-database-shake").Watch(ctx, mongo.Pipeline{}, ops)
csHandler, err = conn.Client.Database("serverless-shake-fake-db").
Collection("serverless-shake-fake-collection").
Watch(conn.ctx, mongo.Pipeline{}, ops)
// csHandler, err = client.Database(dbList[0]).Collection("serverless-shake-fake-collection").Watch(ctx, mongo.Pipeline{}, ops)
if err != nil {
return nil, fmt.Errorf("client[%v] create change stream handler failed[%v]", src, err)
}
} else {
LOG.Info("new change stream with options: %v", printCsOption(ops))
csHandler, err = conn.Client.Watch(conn.ctx, mongo.Pipeline{}, ops)
if err != nil {
if conn != nil {
conn.Close()
}
LOG.Error("client[%v] create change stream handler failed[%v]", src, err)
return nil, fmt.Errorf("client[%v] create change stream handler failed[%v]", src, err)
}
}
return &ChangeStreamConn{
Client: conn.Client,
CsHandler: csHandler,
Ops: ops,
ctx: conn.ctx,
conn: conn,
}, nil
}
func (csc *ChangeStreamConn) Close() {
if csc.CsHandler != nil {
csc.CsHandler.Close(csc.ctx)
csc.CsHandler = nil
}
// 关闭ChangeStream时,同时关闭NewMongoCommunityConn连接
if csc.conn != nil {
csc.conn.Close()
}
if csc.Client != nil {
csc.Client = nil
}
}
func (csc *ChangeStreamConn) IsNotNil() bool {
return csc != nil && csc.Client != nil && csc.CsHandler != nil
}
func (csc *ChangeStreamConn) GetNext() (bool, []byte) {
if ok := csc.CsHandler.Next(csc.ctx); !ok {
return false, nil
}
return true, csc.CsHandler.Current
}
func (csc *ChangeStreamConn) TryNext() (bool, []byte) {
if ok := csc.CsHandler.TryNext(csc.ctx); !ok {
return false, nil
}
return true, csc.CsHandler.Current
}
func (csc *ChangeStreamConn) ResumeToken() interface{} {
out := csc.CsHandler.ResumeToken()
if len(out) == 0 {
return nil
}
return out
}
func printCsOption(ops *options.ChangeStreamOptions) string {
var ret string
if ops.BatchSize != nil {
ret = fmt.Sprintf("%v BatchSize[%v]", ret, *ops.BatchSize)
}
if ops.Collation != nil {
ret = fmt.Sprintf("%v Collation[%v]", ret, *ops.Collation)
}
if ops.FullDocument != nil {
ret = fmt.Sprintf("%v FullDocument[%v]", ret, *ops.FullDocument)
}
if ops.MaxAwaitTime != nil {
ret = fmt.Sprintf("%v MaxAwaitTime[%v]", ret, *ops.MaxAwaitTime)
}
if ops.ResumeAfter != nil {
ret = fmt.Sprintf("%v ResumeAfter[%v]", ret, ops.ResumeAfter)
}
if ops.StartAtOperationTime != nil {
ret = fmt.Sprintf("%v StartAtOperationTime[%v]", ret, *ops.StartAtOperationTime)
}
if ops.StartAfter != nil {
ret = fmt.Sprintf("%v StartAfter[%v]", ret, ops.StartAfter)
}
//if ops.MultiDbSelections != "" {
// ret = fmt.Sprintf("%v MultiDbSelections[%v]", ret, ops.MultiDbSelections)
//}
return ret
}