in domain/model.go [126:176]
func (m *Model) GetOnlineFeaturesWithEntity(joinIds map[string][]interface{}, featureEntityName string) ([]map[string]interface{}, error) {
featureEntity, ok := m.featureEntityMap[featureEntityName]
if !ok {
return nil, fmt.Errorf("feature entity name:%s not found", featureEntityName)
}
size := -1
if _, ok := joinIds[featureEntity.FeatureEntityJoinid]; !ok {
return nil, fmt.Errorf("join id:%s not found", featureEntity.FeatureEntityJoinid)
}
size = len(joinIds[featureEntity.FeatureEntityJoinid])
var wg sync.WaitGroup
joinIdFeaturesMap := make(map[string][]map[string]interface{})
featureViewMap := m.featureEntityJoinIdMap[featureEntity.FeatureEntityJoinid]
var mu sync.Mutex
for _, featureView := range featureViewMap {
wg.Add(1)
go func(featureView FeatureView, joinId string, keys []interface{}) {
defer wg.Done()
features, err := featureView.GetOnlineFeatures(keys, m.featureNamesMap[featureView.GetName()], m.aliasNamesMap[featureView.GetName()])
if err != nil {
fmt.Println(err)
}
mu.Lock()
joinIdFeaturesMap[joinId] = append(joinIdFeaturesMap[joinId], features...)
mu.Unlock()
}(featureView, featureEntity.FeatureEntityJoinid, joinIds[featureEntity.FeatureEntityJoinid])
}
wg.Wait()
featuresResult := make([]map[string]interface{}, 0, size)
for i := 0; i < size; i++ {
features := make(map[string]interface{}, len(m.Features))
joinIdValue := joinIds[featureEntity.FeatureEntityJoinid][i]
for _, joinIdFeatures := range joinIdFeaturesMap[featureEntity.FeatureEntityJoinid] {
if utils.ToString(joinIdFeatures[featureEntity.FeatureEntityJoinid], "") == utils.ToString(joinIdValue, " ") {
for k, v := range joinIdFeatures {
features[k] = v
}
}
}
featuresResult = append(featuresResult, features)
}
return featuresResult, nil
}