in collector/docsyncer/doc_reader.go [94:172]
func (ds *DocumentSplitter) Run() error {
// close channel
defer close(ds.readerChan)
// disable split
if conf.Options.FullSyncReaderParallelThread <= 1 {
LOG.Info("splitter[%s] disable split or no need", ds)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
return nil
}
LOG.Info("splitter[%s] enable split, waiting splitVector return...", ds)
var res bson.M
err := ds.client.Client.Database(ds.ns.Database).RunCommand(nil, bson.D{
{"splitVector", ds.ns.Str()},
{"keyPattern", bson.M{conf.Options.FullSyncReaderParallelIndex: 1}},
// {"maxSplitPoints", ds.pieceNumber - 1},
{"maxChunkSize", ds.pieceByteSize / utils.MB},
}).Decode(res)
// if failed, do not panic, run single thread fetching
if err != nil {
LOG.Warn("splitter[%s] run splitVector failed[%v], give up parallel fetching", ds, err)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
return nil
}
LOG.Info("splitter[%s] run splitVector result: %v", ds, res)
if splitKeys, ok := res["splitKeys"]; ok {
if splitKeysList, ok := splitKeys.([]interface{}); ok && len(splitKeysList) > 0 {
// return list is sorted
ds.pieceNumber = len(splitKeysList) + 1
var start interface{}
cnt := 0
for i, keyDoc := range splitKeysList {
// check key == conf.Options.FullSyncReaderParallelIndex
key, val, err := parseDocKeyValue(keyDoc)
if err != nil {
LOG.Crash("splitter[%s] parse doc key failed: %v", ds, err)
}
if key != conf.Options.FullSyncReaderParallelIndex {
LOG.Crash("splitter[%s] parse doc invalid key: %v", ds, key)
}
LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, cnt, start, val)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, val, ds.sslRootCaFile)
// new start
start = val
cnt++
// last one
if i == len(splitKeysList)-1 {
LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, INF)", ds, cnt, start)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, nil, ds.sslRootCaFile)
}
}
return nil
} else {
LOG.Warn("splitter[%s] run splitVector return empty result[%v]", ds, res)
}
} else {
LOG.Warn("splitter[%s] run splitVector return null result[%v]", ds, res)
}
LOG.Warn("splitter[%s] give up parallel fetching", ds, err)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
LOG.Info("splitter[%s] exits", ds)
return nil
}