collector/docsyncer/doc_reader.go (259 lines of code) (raw):
package docsyncer
import (
"context"
"fmt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"go.mongodb.org/mongo-driver/bson"
"sync/atomic"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
/*************************************************/
// splitter: pre-split the collection into several pieces
type DocumentSplitter struct {
src string // source mongo address url
sslRootCaFile string // source root ca ssl
ns utils.NS // namespace
client *utils.MongoCommunityConn
readerChan chan *DocumentReader // reader chan
pieceByteSize uint64 // each piece max byte size
count uint64 // total document number
pieceNumber int // how many piece
}
func NewDocumentSplitter(src, sslRootCaFile string, ns utils.NS) *DocumentSplitter {
ds := &DocumentSplitter{
src: src,
sslRootCaFile: sslRootCaFile,
ns: ns,
}
// create connection
var err error
// disable timeout
ds.client, err = utils.NewMongoCommunityConn(ds.src, conf.Options.MongoConnectMode, false,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault, sslRootCaFile)
if err != nil {
LOG.Error("splitter[%s] connection mongo[%v] failed[%v]", ds,
utils.BlockMongoUrlPassword(ds.src, "***"), err)
return nil
}
// get total count
var res struct {
Count int64 `bson:"count"`
Size float64 `bson:"size"`
StorageSize float64 `bson:"storageSize"`
}
if err := ds.client.Client.Database(ds.ns.Database).RunCommand(nil,
bson.D{{"collStats", ds.ns.Collection}}).Decode(&res); err != nil {
LOG.Error("splitter[%s] connection mongo[%v] failed[%v]", ds,
utils.BlockMongoUrlPassword(ds.src, "***"), err)
return nil
}
ds.count = uint64(res.Count)
ds.pieceByteSize = uint64(res.Size / float64(conf.Options.FullSyncReaderParallelThread))
if ds.pieceByteSize > 8*utils.GB {
// at most 8GB per chunk
ds.pieceByteSize = 8 * utils.GB
}
LOG.Info("NewDocumentSplitter db[%v] col[%v] res[%v], pieceByteSize[%v]",
ds.ns.Database, ds.ns.Collection, res, ds.pieceByteSize)
if conf.Options.FullSyncReaderParallelThread <= 1 {
ds.readerChan = make(chan *DocumentReader, 1)
} else {
ds.readerChan = make(chan *DocumentReader, 4196)
}
go func() {
if err := ds.Run(); err != nil {
LOG.Crash(err)
}
}()
return ds
}
func (ds *DocumentSplitter) Close() {
ds.client.Close()
}
func (ds *DocumentSplitter) String() string {
return fmt.Sprintf("DocumentSplitter src[%s] ns[%s] count[%v] pieceByteSize[%v MB] pieceNumber[%v]",
utils.BlockMongoUrlPassword(ds.src, "***"), ds.ns, ds.count, ds.pieceByteSize/utils.MB, ds.pieceNumber)
}
// TODO, need add retry
func (ds *DocumentSplitter) Run() error {
// close channel
defer close(ds.readerChan)
// disable split
if conf.Options.FullSyncReaderParallelThread <= 1 {
LOG.Info("splitter[%s] disable split or no need", ds)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
return nil
}
LOG.Info("splitter[%s] enable split, waiting splitVector return...", ds)
var res bson.M
err := ds.client.Client.Database(ds.ns.Database).RunCommand(nil, bson.D{
{"splitVector", ds.ns.Str()},
{"keyPattern", bson.M{conf.Options.FullSyncReaderParallelIndex: 1}},
// {"maxSplitPoints", ds.pieceNumber - 1},
{"maxChunkSize", ds.pieceByteSize / utils.MB},
}).Decode(res)
// if failed, do not panic, run single thread fetching
if err != nil {
LOG.Warn("splitter[%s] run splitVector failed[%v], give up parallel fetching", ds, err)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
return nil
}
LOG.Info("splitter[%s] run splitVector result: %v", ds, res)
if splitKeys, ok := res["splitKeys"]; ok {
if splitKeysList, ok := splitKeys.([]interface{}); ok && len(splitKeysList) > 0 {
// return list is sorted
ds.pieceNumber = len(splitKeysList) + 1
var start interface{}
cnt := 0
for i, keyDoc := range splitKeysList {
// check key == conf.Options.FullSyncReaderParallelIndex
key, val, err := parseDocKeyValue(keyDoc)
if err != nil {
LOG.Crash("splitter[%s] parse doc key failed: %v", ds, err)
}
if key != conf.Options.FullSyncReaderParallelIndex {
LOG.Crash("splitter[%s] parse doc invalid key: %v", ds, key)
}
LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, cnt, start, val)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, val, ds.sslRootCaFile)
// new start
start = val
cnt++
// last one
if i == len(splitKeysList)-1 {
LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, INF)", ds, cnt, start)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, nil, ds.sslRootCaFile)
}
}
return nil
} else {
LOG.Warn("splitter[%s] run splitVector return empty result[%v]", ds, res)
}
} else {
LOG.Warn("splitter[%s] run splitVector return null result[%v]", ds, res)
}
LOG.Warn("splitter[%s] give up parallel fetching", ds, err)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
LOG.Info("splitter[%s] exits", ds)
return nil
}
func parseDocKeyValue(x interface{}) (string, interface{}, error) {
keyDocM := x.(bson.M)
if len(keyDocM) > 1 {
return "", nil, fmt.Errorf("invalid key doc[%v]", keyDocM)
}
var key string
var val interface{}
for key, val = range keyDocM {
}
return key, val, nil
}
/*************************************************/
// DocumentReader: the reader of single piece
type DocumentReader struct {
// source mongo address url
src string
ns utils.NS
sslRootCaFile string // source root ca ssl
// mongo document reader
client *utils.MongoCommunityConn
docCursor *mongo.Cursor
ctx context.Context
rebuild int // rebuild times
concurrency int32 // for test only
// query statement and current max cursor
query bson.M
key string
id int
}
// NewDocumentReader creates reader with mongodb url
func NewDocumentReader(id int, src string, ns utils.NS, key string, start, end interface{}, sslRootCaFile string) *DocumentReader {
q := make(bson.M)
if start != nil || end != nil {
innerQ := make(bson.M)
if start != nil {
innerQ["$gt"] = start
}
if end != nil {
innerQ["$lte"] = end
}
q[key] = innerQ
}
ctx := context.Background()
return &DocumentReader{
id: id,
src: src,
ns: ns,
sslRootCaFile: sslRootCaFile,
query: q,
key: key,
ctx: ctx,
}
}
func (reader *DocumentReader) String() string {
ret := fmt.Sprintf("DocumentReader id[%v], src[%v] ns[%s] query[%v]", reader.id,
utils.BlockMongoUrlPassword(reader.src, "***"), reader.ns, reader.query)
if reader.docCursor != nil {
ret = fmt.Sprintf("%s docCursorId[%v]", ret, reader.docCursor.ID())
}
return ret
}
// NextDoc returns an document by raw bytes which is []byte
// reader.docCursor.Current is valid only before next docCursor.Next(), So must be copy
func (reader *DocumentReader) NextDoc() (doc bson.Raw, err error) {
if err := reader.ensureNetwork(); err != nil {
return nil, err
}
atomic.AddInt32(&reader.concurrency, 1)
defer atomic.AddInt32(&reader.concurrency, -1)
if !reader.docCursor.Next(reader.ctx) {
if err := reader.docCursor.Err(); err != nil {
reader.releaseCursor()
return nil, err
} else {
LOG.Info("reader[%s] finish", reader.String())
return nil, nil
}
}
return reader.docCursor.Current, err
}
// ensureNetwork establish the mongodb connection at first
// if current connection is not ready or disconnected
func (reader *DocumentReader) ensureNetwork() (err error) {
if reader.docCursor != nil {
return nil
}
if reader.client == nil {
LOG.Info("reader[%s] client is empty, create one", reader.String())
reader.client, err = utils.NewMongoCommunityConn(reader.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return err
}
}
reader.rebuild += 1
if reader.rebuild > 1 {
return fmt.Errorf("reader[%s] rebuild illegal", reader.String())
}
findOptions := new(options.FindOptions)
findOptions.SetSort(map[string]interface{}{
"_id": 1,
})
findOptions.SetBatchSize(int32(conf.Options.FullSyncReaderFetchBatchSize)) // set big for test
findOptions.SetHint(map[string]interface{}{
"_id": 1,
})
// enable noCursorTimeout anyway! #451 #784
if reader.client.IsTimeSeriesCollection(reader.ns.Database, reader.ns.Collection) == false {
findOptions.SetNoCursorTimeout(true)
}
findOptions.SetComment(fmt.Sprintf("mongo-shake full sync: ns[%v] query[%v] rebuid-times[%v]",
reader.ns, reader.query, reader.rebuild))
reader.docCursor, err = reader.client.Client.Database(reader.ns.Database).Collection(reader.ns.Collection, nil).
Find(nil, reader.query, findOptions)
if err != nil {
return fmt.Errorf("run find failed: %v", err)
}
LOG.Info("reader[%s] generates new cursor", reader.String())
return nil
}
func (reader *DocumentReader) releaseCursor() {
if reader.docCursor != nil {
LOG.Info("reader[%s] closes cursor[%v]", reader, reader.docCursor.ID())
err := reader.docCursor.Close(reader.ctx)
if err != nil {
LOG.Error("release cursor fail: %v", err)
}
}
reader.docCursor = nil
}
func (reader *DocumentReader) Close() {
LOG.Info("reader[%s] close", reader)
if reader.docCursor != nil {
reader.docCursor.Close(reader.ctx)
}
if reader.client != nil {
reader.client.Client.Disconnect(reader.ctx)
reader.client = nil
}
}