func()

in nimo-shake/writer/mongodb_mgo_driver.go [288:356]


func (mw *MongoWriter) createSingleIndex(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string,
	isPrimaryKey bool) (string, error) {
	primaryKey, sortKey, err := utils.ParsePrimaryAndSortKey(primaryIndexes, parseMap)
	if err != nil {
		return "", fmt.Errorf("parse primary and sort key failed[%v]", err)
	}

	primaryKeyWithType := mw.fetchKey(primaryKey, parseMap[primaryKey])
	indexList := make([]string, 0, 2)
	indexList = append(indexList, primaryKeyWithType)
	if sortKey != "" {
		indexList = append(indexList, mw.fetchKey(sortKey, parseMap[sortKey]))
	}

	LOG.Info("ns[%s] single index[%v] list[%v]", mw.ns, primaryKeyWithType, indexList)

	// primary key should be unique
	unique := isPrimaryKey

	// create union unique index
	if len(indexList) >= 2 {
		// write index
		index := mgo.Index{
			Key:        indexList,
			Background: true,
			Unique:     unique,
		}

		LOG.Info("create union-index isPrimary[%v]: %v", isPrimaryKey, index.Key)

		if err := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).EnsureIndex(index); err != nil {
			return "", fmt.Errorf("create primary union unique[%v] index failed[%v]", unique, err)
		}
	}

	var indexType interface{}
	indexType = "hashed"
	if conf.Options.TargetMongoDBType == utils.TargetMongoDBTypeReplica {
		indexType = 1
	}
	if len(indexList) >= 2 {
		// unique has already be set on the above index
		unique = false
	} else if unique {
		// must be range if only has 1 key
		indexType = 1
	}

	doc := bson.D{
		{Name: "createIndexes", Value: mw.ns.Collection},
		{Name: "indexes", Value: []bson.M{
			{
				"key": bson.M{
					primaryKeyWithType: indexType,
				},
				"name":       fmt.Sprintf("%s_%v", primaryKeyWithType, indexType),
				"background": true,
				"unique":     unique,
			},
		}},
	}
	LOG.Info("create index isPrimary[%v]: %v", isPrimaryKey, doc)
	// create hash key only
	if err := mw.conn.Session.DB(mw.ns.Database).Run(doc, nil); err != nil {
		return "", fmt.Errorf("create primary[%v] %v index failed[%v]", isPrimaryKey, indexType, err)
	}

	return primaryKeyWithType, nil
}