in domain/model.go [65:124]
func (m *Model) GetOnlineFeatures(joinIds map[string][]interface{}) ([]map[string]interface{}, error) {
size := -1
for _, joinid := range m.featureEntityJoinIdList {
if _, ok := joinIds[joinid]; !ok {
return nil, fmt.Errorf("join id:%s not found", joinid)
}
if size == -1 {
size = len(joinIds[joinid])
} else {
if size != len(joinIds[joinid]) {
return nil, fmt.Errorf("join id:%s length not equal", joinid)
}
}
}
var mu sync.Mutex
var wg sync.WaitGroup
joinIdFeaturesMap := make(map[string][]map[string]interface{})
for joinId, keys := range joinIds {
featureViewMap := m.featureEntityJoinIdMap[joinId]
for _, featureView := range featureViewMap {
wg.Add(1)
go func(featureView FeatureView, joinId string, keys []interface{}) {
defer wg.Done()
features, err := featureView.GetOnlineFeatures(keys, m.featureNamesMap[featureView.GetName()], m.aliasNamesMap[featureView.GetName()])
if err != nil {
fmt.Println(err)
}
mu.Lock()
joinIdFeaturesMap[joinId] = append(joinIdFeaturesMap[joinId], features...)
mu.Unlock()
}(featureView, joinId, keys)
}
}
wg.Wait()
featuresResult := make([]map[string]interface{}, 0, size)
for i := 0; i < size; i++ {
features := make(map[string]interface{}, len(m.Features)+len(m.featureEntityJoinIdMap))
for _, joinid := range m.featureEntityJoinIdList {
joinIdValue := joinIds[joinid][i]
for _, joinIdFeatures := range joinIdFeaturesMap[joinid] {
if utils.ToString(joinIdFeatures[joinid], "") == utils.ToString(joinIdValue, " ") {
for k, v := range joinIdFeatures {
features[k] = v
}
}
}
}
featuresResult = append(featuresResult, features)
}
return featuresResult, nil
}