func()

in collector/docsyncer/doc_reader.go [94:172]


func (ds *DocumentSplitter) Run() error {
	// close channel
	defer close(ds.readerChan)

	// disable split
	if conf.Options.FullSyncReaderParallelThread <= 1 {
		LOG.Info("splitter[%s] disable split or no need", ds)
		ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
		LOG.Info("splitter[%s] exits", ds)
		return nil
	}

	LOG.Info("splitter[%s] enable split, waiting splitVector return...", ds)

	var res bson.M
	err := ds.client.Client.Database(ds.ns.Database).RunCommand(nil, bson.D{
		{"splitVector", ds.ns.Str()},
		{"keyPattern", bson.M{conf.Options.FullSyncReaderParallelIndex: 1}},
		// {"maxSplitPoints", ds.pieceNumber - 1},
		{"maxChunkSize", ds.pieceByteSize / utils.MB},
	}).Decode(res)
	// if failed, do not panic, run single thread fetching
	if err != nil {
		LOG.Warn("splitter[%s] run splitVector failed[%v], give up parallel fetching", ds, err)
		ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
		LOG.Info("splitter[%s] exits", ds)
		return nil
	}

	LOG.Info("splitter[%s] run splitVector result: %v", ds, res)

	if splitKeys, ok := res["splitKeys"]; ok {
		if splitKeysList, ok := splitKeys.([]interface{}); ok && len(splitKeysList) > 0 {
			// return list is sorted
			ds.pieceNumber = len(splitKeysList) + 1

			var start interface{}
			cnt := 0
			for i, keyDoc := range splitKeysList {
				// check key == conf.Options.FullSyncReaderParallelIndex
				key, val, err := parseDocKeyValue(keyDoc)
				if err != nil {
					LOG.Crash("splitter[%s] parse doc key failed: %v", ds, err)
				}
				if key != conf.Options.FullSyncReaderParallelIndex {
					LOG.Crash("splitter[%s] parse doc invalid key: %v", ds, key)
				}

				LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, cnt, start, val)
				// inject new DocumentReader into channel
				ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, val, ds.sslRootCaFile)

				// new start
				start = val
				cnt++

				// last one
				if i == len(splitKeysList)-1 {
					LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, INF)", ds, cnt, start)
					// inject new DocumentReader into channel
					ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, nil, ds.sslRootCaFile)
				}
			}

			return nil
		} else {
			LOG.Warn("splitter[%s] run splitVector return empty result[%v]", ds, res)
		}
	} else {
		LOG.Warn("splitter[%s] run splitVector return null result[%v]", ds, res)
	}

	LOG.Warn("splitter[%s] give up parallel fetching", ds, err)
	ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
	LOG.Info("splitter[%s] exits", ds)

	LOG.Info("splitter[%s] exits", ds)
	return nil
}