collector/reader/event_reader.go (135 lines of code) (raw):
package sourceReader
// read change stream event from source mongodb
import (
"sync"
"time"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"fmt"
"github.com/alibaba/MongoShake/v2/collector/ckpt"
"github.com/alibaba/MongoShake/v2/collector/filter"
diskQueue "github.com/vinllen/go-diskqueue"
LOG "github.com/vinllen/log4go"
)
const (
ErrInvalidStartPosition = "resume point may no longer be in the oplog."
)
type EventReader struct {
// source mongo address url
src string
replset string
// mongo client
client *utils.ChangeStreamConn
// stage of fetch and store oplog
fetchStage int32
// disk queue used to store oplog temporarily
diskQueue *diskQueue.DiskQueue
disQueueMutex sync.Mutex // disk queue mutex
// start at operation time
startAtOperationTime interface{}
// event channel
eventChan chan *retOplog
fetcherExist bool
fetcherLock sync.Mutex
firstRead bool
diskQueueLastTs int64 // the last oplog timestamp in disk queue
}
// NewEventReader creates reader with mongodb url
func NewEventReader(src string, replset string) *EventReader {
return &EventReader{
src: src,
replset: replset,
eventChan: make(chan *retOplog, ChannelSize),
firstRead: true,
diskQueueLastTs: -1,
}
}
func (er *EventReader) String() string {
return fmt.Sprintf("EventReader[src:%s replset:%s]", utils.BlockMongoUrlPassword(er.src, "***"),
er.replset)
}
func (er *EventReader) Name() string {
return utils.VarIncrSyncMongoFetchMethodChangeStream
}
// SetQueryTimestampOnEmpty set internal timestamp if
// not exist in this or. initial stage most of the time
func (er *EventReader) SetQueryTimestampOnEmpty(ts interface{}) {
if er.startAtOperationTime == nil && ts != ckpt.InitCheckpoint {
LOG.Info("EventReader set query timestamp: %v", utils.ExtractTimestampForLog(ts))
if val, ok := ts.(int64); ok {
er.startAtOperationTime = val
} else if val2, ok := ts.(int64); ok {
er.startAtOperationTime = val2
} else {
// ResumeToken
er.startAtOperationTime = ts
}
}
}
func (er *EventReader) UpdateQueryTimestamp(ts int64) {
er.startAtOperationTime = ts
}
func (er *EventReader) getQueryTimestamp() int64 {
return er.startAtOperationTime.(int64)
}
// Next returns an oplog by raw bytes which is []byte
func (er *EventReader) Next() ([]byte, error) {
return er.get()
}
func (er *EventReader) get() ([]byte, error) {
select {
case ret := <-er.eventChan:
return ret.log, ret.err
case <-time.After(time.Second * time.Duration(conf.Options.IncrSyncReaderBufferTime)):
return nil, TimeoutError
}
}
// start fetcher if not exist
func (er *EventReader) StartFetcher() {
if er.fetcherExist == true {
return
}
er.fetcherLock.Lock()
if er.fetcherExist == false { // double check
er.fetcherExist = true
go er.fetcher()
}
er.fetcherLock.Unlock()
}
// fetch change stream event tp store disk queue or memory
func (er *EventReader) fetcher() {
LOG.Info("start %s fetcher with src[%v] replica-name[%v] query-ts[%v]",
er.String(), utils.BlockMongoUrlPassword(er.src, "***"), er.replset,
utils.ExtractTimestampForLog(er.startAtOperationTime))
for {
if err := er.EnsureNetwork(); err != nil {
er.eventChan <- &retOplog{nil, err}
continue
}
ok, data := er.client.GetNext()
if !ok {
err := er.client.CsHandler.Err()
// no data
er.client.Close()
LOG.Error("change stream reader hit the end: %v", err)
time.Sleep(1 * time.Second)
continue
}
er.eventChan <- &retOplog{data, nil}
}
}
func (er *EventReader) EnsureNetwork() error {
if er.client != nil && er.client.IsNotNil() {
return nil
}
LOG.Info("%s ensure network", er.String())
if er.client != nil {
er.client.Close() // close old client
}
filterList := filter.NewDocFilterList()
var err error
if er.client, err = utils.NewChangeStreamConn(er.src,
conf.Options.MongoConnectMode,
conf.Options.IncrSyncChangeStreamWatchFullDocument,
conf.Options.SpecialSourceDBFlag,
filterList.IterateFilter,
er.startAtOperationTime,
int32(BatchSize),
conf.Options.SourceDBVersion,
conf.Options.MongoSslRootCaFile); err != nil {
return err
}
return nil
}
func (er *EventReader) FetchNewestTimestamp() (interface{}, error) {
if err := er.EnsureNetwork(); err != nil {
return "", err
}
// non-blocking, and return is useless
er.client.TryNext()
return er.client.ResumeToken(), nil
}