func()

in collector/reader/oplog_reader.go [177:241]


func (or *OplogReader) EnsureNetwork() (err error) {
	if or.oplogsCursor != nil {
		return nil
	}
	LOG.Info("%s ensure network", or.String())

	if or.conn == nil || (or.conn != nil && !or.conn.IsGood()) {
		if or.conn != nil {
			or.conn.Close()
		}
		// reconnect
		if or.conn, err = utils.NewMongoCommunityConn(or.src, conf.Options.MongoConnectMode, true,
			utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault,
			conf.Options.MongoSslRootCaFile); or.conn == nil || err != nil {
			err = fmt.Errorf("oplog_reader reconnect mongo instance [%s] error. %s", or.src, err.Error())
			return err
		}
	}

	findOptions := options.Find().SetBatchSize(int32(BatchSize)).
		SetNoCursorTimeout(true).
		SetCursorType(options.Tailable).
		SetOplogReplay(true)

	var queryTs int64
	// the given oplog timestamp shouldn't bigger than the newest
	if or.firstRead == true {
		// check whether the starting fetching timestamp is less than the newest timestamp exist in the oplog
		newestTs := or.getNewestTimestamp()
		queryTs = or.getQueryTimestamp()
		if newestTs < queryTs {
			LOG.Warn("oplog_reader current starting point[%v] is bigger than the newest timestamp[%v]!",
				utils.ExtractTimestampForLog(queryTs), utils.ExtractTimestampForLog(newestTs))
			queryTs = newestTs
		}
	}

	/*
	 * the given oplog timestamp shouldn't smaller than the oldest.
	 * this may happen when collection capped.
	 */
	oldestTs := or.getOldestTimestamp()
	queryTs = or.getQueryTimestamp()
	if oldestTs > queryTs {
		if !or.firstRead {
			return CollectionCappedError
		} else {
			LOG.Warn("oplog_reader current starting point[%v] is smaller than the oldest timestamp[%v]!",
				utils.ExtractTimestampForLog(queryTs), utils.ExtractTimestampForLog(oldestTs))
		}
	}
	or.firstRead = false

	or.oplogsCursor, err = or.conn.Client.Database(localDB).Collection(utils.OplogNS).Find(context.Background(),
		or.query, findOptions)
	if or.oplogsCursor == nil || err != nil {
		err = fmt.Errorf("oplog_reader Find mongo instance [%s] error. %s", or.src, err.Error())
		LOG.Warn("oplog_reader failed err[%v] or.query[%v]", err, or.query)
		return err
	}

	LOG.Info("%s generates new cursor query[%v]", or.String(), or.query)

	return
}