func()

in domain/model.go [65:124]


func (m *Model) GetOnlineFeatures(joinIds map[string][]interface{}) ([]map[string]interface{}, error) {

	size := -1
	for _, joinid := range m.featureEntityJoinIdList {
		if _, ok := joinIds[joinid]; !ok {
			return nil, fmt.Errorf("join id:%s not found", joinid)
		}
		if size == -1 {
			size = len(joinIds[joinid])
		} else {
			if size != len(joinIds[joinid]) {
				return nil, fmt.Errorf("join id:%s length not equal", joinid)
			}
		}
	}

	var mu sync.Mutex

	var wg sync.WaitGroup
	joinIdFeaturesMap := make(map[string][]map[string]interface{})
	for joinId, keys := range joinIds {
		featureViewMap := m.featureEntityJoinIdMap[joinId]

		for _, featureView := range featureViewMap {
			wg.Add(1)
			go func(featureView FeatureView, joinId string, keys []interface{}) {
				defer wg.Done()
				features, err := featureView.GetOnlineFeatures(keys, m.featureNamesMap[featureView.GetName()], m.aliasNamesMap[featureView.GetName()])
				if err != nil {
					fmt.Println(err)
				}

				mu.Lock()
				joinIdFeaturesMap[joinId] = append(joinIdFeaturesMap[joinId], features...)
				mu.Unlock()

			}(featureView, joinId, keys)
		}
	}
	wg.Wait()

	featuresResult := make([]map[string]interface{}, 0, size)
	for i := 0; i < size; i++ {
		features := make(map[string]interface{}, len(m.Features)+len(m.featureEntityJoinIdMap))
		for _, joinid := range m.featureEntityJoinIdList {
			joinIdValue := joinIds[joinid][i]
			for _, joinIdFeatures := range joinIdFeaturesMap[joinid] {
				if utils.ToString(joinIdFeatures[joinid], "") == utils.ToString(joinIdValue, " ") {
					for k, v := range joinIdFeatures {
						features[k] = v
					}
				}
			}
		}

		featuresResult = append(featuresResult, features)

	}
	return featuresResult, nil
}