collector/reader/oplog_reader.go (204 lines of code) (raw):

package sourceReader // read oplog from source mongodb import ( "context" "errors" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "sync" "time" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/oplog" LOG "github.com/vinllen/log4go" ) const ( QueryTs = "ts" QueryGid = "g" QueryOpGT = "$gt" QueryOpGTE = "$gte" tailTimeout = 7 localDB = "local" ) // TimeoutError. mongodb query executed timeout var TimeoutError = errors.New("read next log timeout, It shouldn't be happen") var CollectionCappedError = errors.New("collection capped error") // OplogReader represents stream reader from mongodb that specified // by an url. And with query options. user can iterate oplogs. type OplogReader struct { // source mongo address url src string replset string // mongo oplog reader conn *utils.MongoCommunityConn oplogsCursor *mongo.Cursor // query statement and current max cursor query bson.M // oplog channel oplogChan chan *retOplog fetcherExist bool fetcherLock sync.Mutex firstRead bool } // NewOplogReader creates reader with mongodb url func NewOplogReader(src string, replset string) *OplogReader { return &OplogReader{ src: src, replset: replset, query: bson.M{}, oplogChan: make(chan *retOplog, ChannelSize), // ten times of batchSize firstRead: true, } } func (or *OplogReader) String() string { return fmt.Sprintf("oplogReader[src:%s replset:%s]", utils.BlockMongoUrlPassword(or.src, "***"), or.replset) } func (or *OplogReader) Name() string { return utils.VarIncrSyncMongoFetchMethodOplog } // SetQueryTimestampOnEmpty set internal timestamp if // not exist in this or. initial stage most of the time func (or *OplogReader) SetQueryTimestampOnEmpty(ts interface{}) { tsB := ts.(int64) if _, exist := or.query[QueryTs]; !exist { LOG.Info("set query timestamp: %v", utils.ExtractTimestampForLog(tsB)) or.UpdateQueryTimestamp(tsB) } } func (or *OplogReader) UpdateQueryTimestamp(ts int64) { or.query[QueryTs] = bson.M{QueryOpGT: utils.Int64ToTimestamp(ts)} LOG.Info("update or.query to %v", or.query) } func (or *OplogReader) getQueryTimestamp() int64 { return utils.TimeStampToInt64(or.query[QueryTs].(bson.M)[QueryOpGT].(primitive.Timestamp)) } // Next returns an oplog by raw bytes which is []byte func (or *OplogReader) Next() ([]byte, error) { return or.get() } // NextOplog returns an oplog by oplog.GenericOplog struct func (or *OplogReader) NextOplog() (log *oplog.GenericOplog, err error) { var raw []byte if raw, err = or.Next(); err != nil { return nil, err } log = &oplog.GenericOplog{Raw: raw, Parsed: new(oplog.PartialLog)} bson.Unmarshal(raw, log.Parsed) return log, nil } // internal get next oplog. Used in Next() and NextOplog(). The channel and current function may both return // timeout which is acceptable. func (or *OplogReader) get() (log []byte, err error) { select { case ret := <-or.oplogChan: return ret.log, ret.err case <-time.After(time.Second * time.Duration(conf.Options.IncrSyncReaderBufferTime)): return nil, TimeoutError } } // start fetcher if not exist func (or *OplogReader) StartFetcher() { if or.fetcherExist == true { return } or.fetcherLock.Lock() if or.fetcherExist == false { // double check or.fetcherExist = true go or.fetcher() } or.fetcherLock.Unlock() } // fetch oplog tp store disk queue or memory func (or *OplogReader) fetcher() { LOG.Info("start %s fetcher with src[%v] replica-name[%v] query-ts[%v]", or.String(), utils.BlockMongoUrlPassword(or.src, "***"), or.replset, or.query[QueryTs].(bson.M)[QueryOpGT].(primitive.Timestamp)) for { if err := or.EnsureNetwork(); err != nil { or.oplogChan <- &retOplog{nil, err} continue } if !or.oplogsCursor.Next(context.Background()) { if err := or.oplogsCursor.Err(); err != nil { // some internal error. need rebuild the oplogsCursor or.releaseCursor() if utils.IsCollectionCappedError(err) { // print it LOG.Error("oplog collection capped may happen: %v", err) or.oplogChan <- &retOplog{nil, CollectionCappedError} } else { or.oplogChan <- &retOplog{nil, fmt.Errorf("get next oplog failed, release oplogsIterator, %s", err.Error())} } // wait a moment time.Sleep(1 * time.Second) } else { // query timeout or.oplogChan <- &retOplog{nil, TimeoutError} } continue } or.oplogChan <- &retOplog{or.oplogsCursor.Current, nil} } } // ensureNetwork establish the mongodb connection at first // if current connection is not ready or disconnected func (or *OplogReader) EnsureNetwork() (err error) { if or.oplogsCursor != nil { return nil } LOG.Info("%s ensure network", or.String()) if or.conn == nil || (or.conn != nil && !or.conn.IsGood()) { if or.conn != nil { or.conn.Close() } // reconnect if or.conn, err = utils.NewMongoCommunityConn(or.src, conf.Options.MongoConnectMode, true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); or.conn == nil || err != nil { err = fmt.Errorf("oplog_reader reconnect mongo instance [%s] error. %s", or.src, err.Error()) return err } } findOptions := options.Find().SetBatchSize(int32(BatchSize)). SetNoCursorTimeout(true). SetCursorType(options.Tailable). SetOplogReplay(true) var queryTs int64 // the given oplog timestamp shouldn't bigger than the newest if or.firstRead == true { // check whether the starting fetching timestamp is less than the newest timestamp exist in the oplog newestTs := or.getNewestTimestamp() queryTs = or.getQueryTimestamp() if newestTs < queryTs { LOG.Warn("oplog_reader current starting point[%v] is bigger than the newest timestamp[%v]!", utils.ExtractTimestampForLog(queryTs), utils.ExtractTimestampForLog(newestTs)) queryTs = newestTs } } /* * the given oplog timestamp shouldn't smaller than the oldest. * this may happen when collection capped. */ oldestTs := or.getOldestTimestamp() queryTs = or.getQueryTimestamp() if oldestTs > queryTs { if !or.firstRead { return CollectionCappedError } else { LOG.Warn("oplog_reader current starting point[%v] is smaller than the oldest timestamp[%v]!", utils.ExtractTimestampForLog(queryTs), utils.ExtractTimestampForLog(oldestTs)) } } or.firstRead = false or.oplogsCursor, err = or.conn.Client.Database(localDB).Collection(utils.OplogNS).Find(context.Background(), or.query, findOptions) if or.oplogsCursor == nil || err != nil { err = fmt.Errorf("oplog_reader Find mongo instance [%s] error. %s", or.src, err.Error()) LOG.Warn("oplog_reader failed err[%v] or.query[%v]", err, or.query) return err } LOG.Info("%s generates new cursor query[%v]", or.String(), or.query) return } // get newest oplog func (or *OplogReader) getNewestTimestamp() int64 { ts, _ := utils.GetNewestTimestampByConn(or.conn) return ts } // get oldest oplog func (or *OplogReader) getOldestTimestamp() int64 { ts, _ := utils.GetOldestTimestampByConn(or.conn) return ts } func (or *OplogReader) releaseCursor() { if or.oplogsCursor != nil { or.oplogsCursor.Close(context.Background()) } or.oplogsCursor = nil } func (or *OplogReader) FetchNewestTimestamp() (interface{}, error) { return nil, fmt.Errorf("interface not implement") } // GidOplogReader. query along with gid type GidOplogReader struct { OplogReader } func (reader *GidOplogReader) SetQueryGid(gid string) { reader.query[QueryGid] = gid } func NewGidOplogReader(src string) *GidOplogReader { return &GidOplogReader{ OplogReader: OplogReader{src: src, query: bson.M{}}, } }