in collector/docsyncer/doc_syncer.go [206:287]
func StartIndexSync(indexMap map[utils.NS][]bson.D, toUrl string,
nsTrans *transform.NamespaceTransform, background bool) (syncError error) {
if conf.Options.FullSyncExecutorDebug {
LOG.Info("full_sync.executor.debug set, no need to sync index")
return nil
}
type IndexNS struct {
ns utils.NS
indexList []bson.D
}
LOG.Info("start writing index with background[%v], indexMap length[%v]", background, len(indexMap))
if len(indexMap) == 0 {
LOG.Info("finish writing index, but no data")
return nil
}
collExecutorParallel := conf.Options.FullSyncReaderCollectionParallel
namespaces := make(chan *IndexNS, collExecutorParallel)
nimo.GoRoutine(func() {
for ns, indexList := range indexMap {
namespaces <- &IndexNS{ns: ns, indexList: indexList}
}
close(namespaces)
})
var wg sync.WaitGroup
wg.Add(collExecutorParallel)
for i := 0; i < collExecutorParallel; i++ {
nimo.GoRoutine(func() {
var conn *utils.MongoCommunityConn
var err error
if conn, err = utils.NewMongoCommunityConn(toUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernMajority, conf.Options.TunnelMongoSslRootCaFile); err != nil {
LOG.Error("write index but create client fail: %v", err)
return
}
defer conn.Close()
defer wg.Done()
for {
indexNs, ok := <-namespaces
if !ok {
break
}
ns := indexNs.ns
toNS := ns
if nsTrans != nil {
toNS = utils.NewNS(nsTrans.Transform(ns.Str()))
}
for _, index := range indexNs.indexList {
// ignore _id
if utils.HaveIdIndexKey(index) {
continue
}
newIndex := bson.D{}
for _, v := range index {
if v.Key == "ns" || v.Key == "v" || v.Key == "background" {
continue
}
newIndex = append(newIndex, v)
}
newIndex = append(newIndex, primitive.E{Key: "background", Value: background})
if out := conn.Client.Database(toNS.Database).RunCommand(nil, bson.D{
{"createIndexes", toNS.Collection},
{"indexes", []bson.D{newIndex}},
}); out.Err() != nil {
LOG.Warn("Create indexes for ns %v of dest mongodb failed. %v", ns, out.Err())
}
}
LOG.Info("Create indexes for ns %v of dest mongodb finish", toNS)
}
})
}
wg.Wait()
LOG.Info("finish writing index")
return syncError
}