func()

in dao/feature_view_featuredb_dao.go [882:1138]


func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
	selectFieldsSet := make(map[string]struct{})
	for _, selectField := range selectFields {
		selectFieldsSet[selectField] = struct{}{}
	}
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	errChan := make(chan error, len(userIds))

	fetchDataFunc := func(user_id interface{}) []map[string]interface{} {
		results := []map[string]interface{}{}

		var response *http.Response
		if len(events) == 0 {
			prefixs := []string{fmt.Sprintf("%v\u001D", user_id)}
			request := FeatureDBScanKKVRequest{
				Prefixs:   prefixs,
				WithValue: true,
			}
			body, _ := json.Marshal(request)
			url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
			req, err := http.NewRequest("POST", url, bytes.NewReader(body))
			if err != nil {
				errChan <- err
				return nil
			}
			req.Header.Set("Content-Type", "application/json")
			req.Header.Set("Authorization", d.featureDBClient.Token)
			req.Header.Set("Auth", d.signature)

			response, err = d.featureDBClient.Client.Do(req)
			if err != nil {
				url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
				req, err = http.NewRequest("POST", url, bytes.NewReader(body))
				if err != nil {
					errChan <- err
					return nil
				}
				req.Header.Set("Content-Type", "application/json")
				req.Header.Set("Authorization", d.featureDBClient.Token)
				req.Header.Set("Auth", d.signature)
				response, err = d.featureDBClient.Client.Do(req)
				if err != nil {
					errChan <- err
					return nil
				}
			}
		} else {
			pks := make([]string, 0, len(events))
			for _, event := range events {
				pks = append(pks, fmt.Sprintf("%v\u001D%v", user_id, event))
			}
			request := FeatureDBBatchGetKKVRequest{
				PKs:       pks,
				WithValue: true,
			}
			body, _ := json.Marshal(request)
			url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
			req, err := http.NewRequest("POST", url, bytes.NewReader(body))
			if err != nil {
				errChan <- err
				return nil
			}
			req.Header.Set("Content-Type", "application/json")
			req.Header.Set("Authorization", d.featureDBClient.Token)
			req.Header.Set("Auth", d.signature)

			response, err = d.featureDBClient.Client.Do(req)
			if err != nil {
				url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
				req, err = http.NewRequest("POST", url, bytes.NewReader(body))
				if err != nil {
					errChan <- err
					return nil
				}
				req.Header.Set("Content-Type", "application/json")
				req.Header.Set("Authorization", d.featureDBClient.Token)
				req.Header.Set("Auth", d.signature)
				response, err = d.featureDBClient.Client.Do(req)
				if err != nil {
					errChan <- err
					return nil
				}
			}
		}

		defer response.Body.Close() // 确保关闭response.Body
		// 检查状态码
		if response.StatusCode != http.StatusOK {
			bodyBytes, err := io.ReadAll(response.Body)
			if err != nil {
				errChan <- err
				return nil
			}

			var bodyMap map[string]interface{}
			if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
				if msg, found := bodyMap["message"]; found {
					log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
				}
			}
			return nil
		}

		reader := bufio.NewReader(response.Body)
		innerReader := readerPool.Get().(*bytes.Reader)
		defer readerPool.Put(innerReader)
		for {
			buf, err := deserialize(reader)
			if err == io.EOF {
				break // End of stream
			}
			if err != nil {
				errChan <- err
				return nil
			}
			kkvRecordBlock := fdbserverfb.GetRootAsKKVRecordBlock(buf, 0)

			for i := 0; i < kkvRecordBlock.ValuesLength(); i++ {
				kkv := new(fdbserverfb.KKVData)
				kkvRecordBlock.Values(kkv, i)
				dataBytes := kkv.ValueBytes()
				if len(dataBytes) < 2 {
					//fmt.Println("userid ", user_id, " not exists")
					continue
				}
				innerReader.Reset(dataBytes)

				// 读取版本号
				var protocalVersion, ifNullFlagVersion uint8
				binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
				binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)

				readFeatureDBFunc_F_1 := func() (map[string]interface{}, error) {
					properties := make(map[string]interface{})

					for _, field := range d.fields {
						var isNull uint8
						if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
							if err == io.EOF {
								break
							}
							return nil, err
						}
						if isNull == 1 {
							// 跳过空值
							continue
						}

						if _, exists := selectFieldsSet[field]; exists {
							switch d.fieldTypeMap[field] {
							case constants.FS_INT32:
								var int32Value int32
								binary.Read(innerReader, binary.LittleEndian, &int32Value)
								properties[field] = int32Value
							case constants.FS_INT64:
								var int64Value int64
								binary.Read(innerReader, binary.LittleEndian, &int64Value)
								properties[field] = int64Value
							case constants.FS_FLOAT:
								var float32Value float32
								binary.Read(innerReader, binary.LittleEndian, &float32Value)
								properties[field] = float32Value
							case constants.FS_DOUBLE:
								var float64Value float64
								binary.Read(innerReader, binary.LittleEndian, &float64Value)
								properties[field] = float64Value
							case constants.FS_STRING:
								var length uint32
								binary.Read(innerReader, binary.LittleEndian, &length)
								strBytes := make([]byte, length)
								binary.Read(innerReader, binary.LittleEndian, &strBytes)
								properties[field] = string(strBytes)
							case constants.FS_BOOLEAN:
								var boolValue bool
								binary.Read(innerReader, binary.LittleEndian, &boolValue)
								properties[field] = boolValue
							default:
								var length uint32
								binary.Read(innerReader, binary.LittleEndian, &length)
								strBytes := make([]byte, length)
								binary.Read(innerReader, binary.LittleEndian, &strBytes)
								properties[field] = string(strBytes)
							}
						} else {
							var skipBytes int = 0
							switch d.fieldTypeMap[field] {
							case constants.FS_INT32:
								skipBytes = 4
							case constants.FS_INT64:
								skipBytes = 8
							case constants.FS_FLOAT:
								skipBytes = 4
							case constants.FS_DOUBLE:
								skipBytes = 8
							case constants.FS_STRING:
								var length uint32
								binary.Read(innerReader, binary.LittleEndian, &length)
								skipBytes = int(length)
							case constants.FS_BOOLEAN:
								skipBytes = 1
							default:
								var length uint32
								binary.Read(innerReader, binary.LittleEndian, &length)
								skipBytes = int(length)
							}

							if skipBytes > 0 {
								if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
									return nil, err
								}
							}
						}
					}
					return properties, nil
				}

				if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
					readResult, err := readFeatureDBFunc_F_1()
					if err != nil {
						errChan <- err
						return nil
					}
					if t, exist := sequencePlayTimeMap[utils.ToString(readResult[sequenceConfig.EventField], "")]; exist {
						if utils.ToFloat(readResult[sequenceConfig.PlayTimeField], 0.0) <= t {
							continue
						}
					}
					results = append(results, readResult)
				} else {
					errChan <- fmt.Errorf("unsupported protocal version: %d, ifNullFlagVersion: %d", protocalVersion, ifNullFlagVersion)
					return nil
				}
			}
		}

		return results
	}

	results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
	var outmu sync.Mutex
	var wg sync.WaitGroup

	for _, userId := range userIds {
		wg.Add(1)
		go func(userId interface{}) {
			defer wg.Done()
			innerresult := fetchDataFunc(userId)
			outmu.Lock()
			results = append(results, innerresult...)
			outmu.Unlock()
		}(userId)
	}
	wg.Wait()

	return results, nil
}