func StartIndexSync()

in collector/docsyncer/doc_syncer.go [206:287]


func StartIndexSync(indexMap map[utils.NS][]bson.D, toUrl string,
	nsTrans *transform.NamespaceTransform, background bool) (syncError error) {
	if conf.Options.FullSyncExecutorDebug {
		LOG.Info("full_sync.executor.debug set, no need to sync index")
		return nil
	}

	type IndexNS struct {
		ns        utils.NS
		indexList []bson.D
	}

	LOG.Info("start writing index with background[%v], indexMap length[%v]", background, len(indexMap))
	if len(indexMap) == 0 {
		LOG.Info("finish writing index, but no data")
		return nil
	}

	collExecutorParallel := conf.Options.FullSyncReaderCollectionParallel
	namespaces := make(chan *IndexNS, collExecutorParallel)
	nimo.GoRoutine(func() {
		for ns, indexList := range indexMap {
			namespaces <- &IndexNS{ns: ns, indexList: indexList}
		}
		close(namespaces)
	})

	var wg sync.WaitGroup
	wg.Add(collExecutorParallel)
	for i := 0; i < collExecutorParallel; i++ {
		nimo.GoRoutine(func() {
			var conn *utils.MongoCommunityConn
			var err error
			if conn, err = utils.NewMongoCommunityConn(toUrl, utils.VarMongoConnectModePrimary, true,
				utils.ReadWriteConcernLocal, utils.ReadWriteConcernMajority, conf.Options.TunnelMongoSslRootCaFile); err != nil {
				LOG.Error("write index but create client fail: %v", err)
				return
			}
			defer conn.Close()
			defer wg.Done()

			for {
				indexNs, ok := <-namespaces
				if !ok {
					break
				}
				ns := indexNs.ns
				toNS := ns
				if nsTrans != nil {
					toNS = utils.NewNS(nsTrans.Transform(ns.Str()))
				}

				for _, index := range indexNs.indexList {
					// ignore _id
					if utils.HaveIdIndexKey(index) {
						continue
					}

					newIndex := bson.D{}
					for _, v := range index {
						if v.Key == "ns" || v.Key == "v" || v.Key == "background" {
							continue
						}
						newIndex = append(newIndex, v)
					}
					newIndex = append(newIndex, primitive.E{Key: "background", Value: background})
					if out := conn.Client.Database(toNS.Database).RunCommand(nil, bson.D{
						{"createIndexes", toNS.Collection},
						{"indexes", []bson.D{newIndex}},
					}); out.Err() != nil {
						LOG.Warn("Create indexes for ns %v of dest mongodb failed. %v", ns, out.Err())
					}
				}
				LOG.Info("Create indexes for ns %v of dest mongodb finish", toNS)
			}
		})
	}

	wg.Wait()
	LOG.Info("finish writing index")
	return syncError
}