func()

in nimo-shake/writer/mongodb_community_driver.go [308:378]


func (mcw *MongoCommunityWriter) createSingleIndex(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string,
	isPrimaryKey bool) (string, error) {
	primaryKey, sortKey, err := utils.ParsePrimaryAndSortKey(primaryIndexes, parseMap)
	if err != nil {
		return "", fmt.Errorf("parse primary and sort key failed[%v]", err)
	}

	primaryKeyWithType := mcw.fetchKey(primaryKey, parseMap[primaryKey])
	primaryKeyWithType = conf.ConvertIdFunc(primaryKeyWithType)
	indexList := make([]string, 0, 2)
	indexList = append(indexList, primaryKeyWithType)
	if sortKey != "" {
		indexList = append(indexList, conf.ConvertIdFunc(mcw.fetchKey(sortKey, parseMap[sortKey])))
	}

	LOG.Info("ns[%s] single index[%v] list[%v]", mcw.ns, primaryKeyWithType, indexList)

	// primary key should be unique
	unique := isPrimaryKey

	ctx := context.Background()

	// create union unique index
	if len(indexList) >= 2 {
		LOG.Info("create union-index isPrimary[%v]: %v", isPrimaryKey, indexList)

		var keysMap bson2.D
		for _, ele := range indexList {
			keysMap = append(keysMap, bson2.E{ele, 1})
		}
		indexModel := mongo.IndexModel{
			Keys:    keysMap,
			Options: &options.IndexOptions{},
		}
		indexModel.Options.SetUnique(unique)
		indexModel.Options.SetBackground(true)
		_, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).Indexes().CreateOne(ctx, indexModel)
		if err != nil {
			return "", fmt.Errorf("create primary union unique[%v] index failed[%v]", unique, err)
		}
	}

	var indexType interface{}
	indexType = "hashed"
	if conf.Options.TargetMongoDBType == utils.TargetMongoDBTypeReplica {
		indexType = 1
	}
	if len(indexList) >= 2 {
		// unique has already be set on the above index
		unique = false
	} else if unique {
		// must be range if only has 1 key
		indexType = 1
	}

	indexModel := mongo.IndexModel{
		Keys: bson2.M{
			primaryKeyWithType: indexType,
		},
		Options: &options.IndexOptions{},
	}
	indexModel.Options.SetUnique(unique)
	indexModel.Options.SetBackground(true)

	_, err = mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).Indexes().CreateOne(ctx, indexModel)
	if err != nil {
		return "", fmt.Errorf("create primary[%v] %v index failed[%v]", isPrimaryKey, indexType, err)
	}

	return primaryKeyWithType, nil
}