func()

in domain/model.go [126:176]


func (m *Model) GetOnlineFeaturesWithEntity(joinIds map[string][]interface{}, featureEntityName string) ([]map[string]interface{}, error) {
	featureEntity, ok := m.featureEntityMap[featureEntityName]
	if !ok {
		return nil, fmt.Errorf("feature entity name:%s not found", featureEntityName)
	}
	size := -1
	if _, ok := joinIds[featureEntity.FeatureEntityJoinid]; !ok {
		return nil, fmt.Errorf("join id:%s not found", featureEntity.FeatureEntityJoinid)
	}

	size = len(joinIds[featureEntity.FeatureEntityJoinid])

	var wg sync.WaitGroup
	joinIdFeaturesMap := make(map[string][]map[string]interface{})
	featureViewMap := m.featureEntityJoinIdMap[featureEntity.FeatureEntityJoinid]

	var mu sync.Mutex

	for _, featureView := range featureViewMap {
		wg.Add(1)
		go func(featureView FeatureView, joinId string, keys []interface{}) {
			defer wg.Done()
			features, err := featureView.GetOnlineFeatures(keys, m.featureNamesMap[featureView.GetName()], m.aliasNamesMap[featureView.GetName()])
			if err != nil {
				fmt.Println(err)
			}
			mu.Lock()
			joinIdFeaturesMap[joinId] = append(joinIdFeaturesMap[joinId], features...)
			mu.Unlock()

		}(featureView, featureEntity.FeatureEntityJoinid, joinIds[featureEntity.FeatureEntityJoinid])
	}
	wg.Wait()

	featuresResult := make([]map[string]interface{}, 0, size)
	for i := 0; i < size; i++ {
		features := make(map[string]interface{}, len(m.Features))
		joinIdValue := joinIds[featureEntity.FeatureEntityJoinid][i]
		for _, joinIdFeatures := range joinIdFeaturesMap[featureEntity.FeatureEntityJoinid] {
			if utils.ToString(joinIdFeatures[featureEntity.FeatureEntityJoinid], "") == utils.ToString(joinIdValue, " ") {
				for k, v := range joinIdFeatures {
					features[k] = v
				}
			}
		}

		featuresResult = append(featuresResult, features)

	}
	return featuresResult, nil
}