func()

in dao/feature_view_featuredb_dao.go [699:874]


func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
	currTime := time.Now().Unix()
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	errChan := make(chan error, len(keys)*len(onlineConfig))

	fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
		sequences := []*sequenceInfo{}

		events := strings.Split(seqEvent, "|")
		pks := []string{}
		for _, event := range events {
			pks = append(pks, fmt.Sprintf("%v\u001D%s", key, event))
		}
		request := FeatureDBBatchGetKKVRequest{
			PKs:    pks,
			Length: seqLen,
		}
		body, _ := json.Marshal(request)
		url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
		req, err := http.NewRequest("POST", url, bytes.NewReader(body))
		if err != nil {
			errChan <- err
			return nil
		}
		req.Header.Set("Content-Type", "application/json")
		req.Header.Set("Authorization", d.featureDBClient.Token)
		req.Header.Set("Auth", d.signature)

		response, err := d.featureDBClient.Client.Do(req)
		if err != nil {
			url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
			req, err = http.NewRequest("POST", url, bytes.NewReader(body))
			if err != nil {
				errChan <- err
				return nil
			}
			req.Header.Set("Content-Type", "application/json")
			req.Header.Set("Authorization", d.featureDBClient.Token)
			req.Header.Set("Auth", d.signature)
			response, err = d.featureDBClient.Client.Do(req)

			if err != nil {
				errChan <- err
				return nil
			}
		}
		defer response.Body.Close() // 确保关闭response.Body
		// 检查状态码
		if response.StatusCode != http.StatusOK {
			bodyBytes, err := io.ReadAll(response.Body)
			if err != nil {
				errChan <- err
				return nil
			}
			var bodyMap map[string]interface{}
			if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
				if msg, found := bodyMap["message"]; found {
					log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
				}
			}
			return nil
		}

		reader := bufio.NewReader(response.Body)
		for {
			buf, err := deserialize(reader)
			if err == io.EOF {
				break // End of stream
			}
			if err != nil {
				errChan <- err
				return nil
			}

			kkvRecordBlock := fdbserverfb.GetRootAsKKVRecordBlock(buf, 0)

			for i := 0; i < kkvRecordBlock.ValuesLength(); i++ {
				kkv := new(fdbserverfb.KKVData)
				kkvRecordBlock.Values(kkv, i)
				pk := string(kkv.Pk())
				userIdEvent := strings.Split(pk, "\u001D")
				if len(userIdEvent) != 2 {
					continue
				}
				var itemId string
				if sequenceConfig.DeduplicationMethodNum == 1 {
					itemId = string(kkv.Sk())
				} else if sequenceConfig.DeduplicationMethodNum == 2 {
					sk := string(kkv.Sk())
					itemIdTimestamp := strings.Split(sk, "\u001D")
					if len(itemIdTimestamp) != 2 {
						continue
					}
					itemId = itemIdTimestamp[0]
				} else {
					continue
				}

				seq := new(sequenceInfo)
				seq.event = userIdEvent[1]
				seq.itemId = itemId
				seq.timestamp = kkv.EventTimestamp()
				seq.playTime = kkv.PlayTime()

				if seq.event == "" || seq.itemId == "" {
					continue
				}
				if t, exist := sequencePlayTimeMap[seq.event]; exist {
					if seq.playTime <= t {
						continue
					}
				}

				sequences = append(sequences, seq)
			}
		}

		return sequences
	}

	results := make([]map[string]interface{}, 0, len(keys))
	var outmu sync.Mutex

	var wg sync.WaitGroup
	for _, key := range keys {
		wg.Add(1)
		go func(key interface{}) {
			defer wg.Done()
			properties := make(map[string]interface{})
			var mu sync.Mutex

			var eventWg sync.WaitGroup
			for _, seqConfig := range onlineConfig {
				eventWg.Add(1)
				go func(seqConfig *api.SeqConfig) {
					defer eventWg.Done()
					var onlineSequences []*sequenceInfo
					var offlineSequences []*sequenceInfo

					// FeatureDB has processed the integration of online sequence features and offline sequence features
					// Here we put the results into onlineSequences

					if onlineresult := fetchDataFunc(seqConfig.SeqEvent, seqConfig.SeqLen, key); onlineresult != nil {
						onlineSequences = onlineresult
					}

					subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
					mu.Lock()
					defer mu.Unlock()
					for k, value := range subproperties {
						properties[k] = value
					}
				}(seqConfig)
			}
			eventWg.Wait()

			properties[userIdField] = key
			outmu.Lock()
			results = append(results, properties)
			outmu.Unlock()
		}(key)
	}

	wg.Wait()

	close(errChan)

	for err := range errChan {
		if err != nil {
			return nil, err
		}
	}

	return results, nil
}