in collector/reader/oplog_reader.go [177:241]
func (or *OplogReader) EnsureNetwork() (err error) {
if or.oplogsCursor != nil {
return nil
}
LOG.Info("%s ensure network", or.String())
if or.conn == nil || (or.conn != nil && !or.conn.IsGood()) {
if or.conn != nil {
or.conn.Close()
}
// reconnect
if or.conn, err = utils.NewMongoCommunityConn(or.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault,
conf.Options.MongoSslRootCaFile); or.conn == nil || err != nil {
err = fmt.Errorf("oplog_reader reconnect mongo instance [%s] error. %s", or.src, err.Error())
return err
}
}
findOptions := options.Find().SetBatchSize(int32(BatchSize)).
SetNoCursorTimeout(true).
SetCursorType(options.Tailable).
SetOplogReplay(true)
var queryTs int64
// the given oplog timestamp shouldn't bigger than the newest
if or.firstRead == true {
// check whether the starting fetching timestamp is less than the newest timestamp exist in the oplog
newestTs := or.getNewestTimestamp()
queryTs = or.getQueryTimestamp()
if newestTs < queryTs {
LOG.Warn("oplog_reader current starting point[%v] is bigger than the newest timestamp[%v]!",
utils.ExtractTimestampForLog(queryTs), utils.ExtractTimestampForLog(newestTs))
queryTs = newestTs
}
}
/*
* the given oplog timestamp shouldn't smaller than the oldest.
* this may happen when collection capped.
*/
oldestTs := or.getOldestTimestamp()
queryTs = or.getQueryTimestamp()
if oldestTs > queryTs {
if !or.firstRead {
return CollectionCappedError
} else {
LOG.Warn("oplog_reader current starting point[%v] is smaller than the oldest timestamp[%v]!",
utils.ExtractTimestampForLog(queryTs), utils.ExtractTimestampForLog(oldestTs))
}
}
or.firstRead = false
or.oplogsCursor, err = or.conn.Client.Database(localDB).Collection(utils.OplogNS).Find(context.Background(),
or.query, findOptions)
if or.oplogsCursor == nil || err != nil {
err = fmt.Errorf("oplog_reader Find mongo instance [%s] error. %s", or.src, err.Error())
LOG.Warn("oplog_reader failed err[%v] or.query[%v]", err, or.query)
return err
}
LOG.Info("%s generates new cursor query[%v]", or.String(), or.query)
return
}