module/user.go (283 lines of code) (raw):
package module
import (
"errors"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/alibaba/pairec/v2/context"
)
type UID string
type User struct {
Id UID `json:"uid"`
//Vector string
mutex sync.RWMutex
Properties map[string]interface{} `json:"properties"`
cacheFeatures map[string]map[string]any `json:"-"`
featureAsyncLoadCount int32
featureAsyncLoadCh chan struct{}
featureAsyncLoadChClosed bool
}
func NewUser(id string) *User {
user := User{
Id: UID(id),
cacheFeatures: make(map[string]map[string]any),
featureAsyncLoadCount: 0,
featureAsyncLoadCh: make(chan struct{}, 1),
}
user.Properties = make(map[string]interface{})
return &user
}
func NewUserWithContext(id UID, context *context.RecommendContext) *User {
user := NewUser(string(id))
features := context.GetParameter("features")
properties := make(map[string]any, 64)
if featuresMap, ok := features.(map[string]interface{}); ok {
properties = make(map[string]any, len(featuresMap))
for k, v := range featuresMap {
if strValue, ok := v.(string); ok {
if strValue != "" {
properties[k] = v
}
} else {
properties[k] = v
}
}
}
properties["uid"] = string(id)
user.Properties = properties
return user
}
func (u *User) Clone() *User {
user := User{
Id: u.Id,
Properties: make(map[string]interface{}),
cacheFeatures: make(map[string]map[string]any),
featureAsyncLoadCh: make(chan struct{}, 1),
featureAsyncLoadChClosed: true,
}
u.mutex.RLock()
defer u.mutex.RUnlock()
close(user.featureAsyncLoadCh)
for k, v := range u.Properties {
user.Properties[k] = v
}
for name, m := range u.cacheFeatures {
cloneMap := make(map[string]any, len(m))
for k, v := range m {
cloneMap[k] = v
}
user.cacheFeatures[name] = cloneMap
}
return &user
}
func (u *User) AddProperty(key string, value interface{}) {
u.mutex.Lock()
u.Properties[key] = value
u.mutex.Unlock()
}
func (u *User) AddProperties(properties map[string]interface{}) {
u.mutex.Lock()
for key, val := range properties {
u.Properties[key] = val
}
u.mutex.Unlock()
}
func (u *User) FloatProperty(key string) (float64, error) {
u.mutex.RLock()
defer u.mutex.RUnlock()
val, ok := u.Properties[key]
if !ok {
return float64(0), errors.New("property key not exist")
}
switch value := val.(type) {
case float64:
return value, nil
case int:
return float64(value), nil
case string:
f, err := strconv.ParseFloat(value, 64)
return f, err
default:
return float64(0), errors.New("unsupported type")
}
}
func (u *User) GetEmbeddingFeature() (features map[string]interface{}) {
u.mutex.RLock()
defer u.mutex.RUnlock()
features = make(map[string]interface{})
for k, v := range u.Properties {
if strings.HasSuffix(k, "embedding") {
if emb, ok := v.(string); ok {
features[k] = strings.Trim(emb, "{}")
} else {
features[k] = v
}
}
}
return
}
func (u *User) MakeUserFeatures() (features map[string]interface{}) {
u.mutex.RLock()
defer u.mutex.RUnlock()
features = make(map[string]interface{})
for k, v := range u.Properties {
if k == "type" {
continue
}
if s, ok := v.(float64); ok {
features[k] = s
continue
}
if str, ok := v.(string); ok {
if s, err := strconv.ParseFloat(str, 64); err == nil {
features[k] = s
continue
}
}
features[k] = v
}
return
}
// MakeUserFeatures2 for easyrec processor
func (u *User) MakeUserFeatures2() (features map[string]interface{}) {
u.mutex.RLock()
defer u.mutex.RUnlock()
features = make(map[string]interface{}, len(u.Properties))
for k, v := range u.Properties {
features[k] = v
}
return
}
func (u *User) StringProperty(key string) string {
u.mutex.RLock()
defer u.mutex.RUnlock()
val, ok := u.Properties[key]
if !ok {
return ""
}
switch value := val.(type) {
case string:
return value
case int:
return strconv.Itoa(value)
case float64:
return strconv.FormatFloat(value, 'f', -1, 64)
case int32:
return strconv.Itoa(int(value))
case int64:
return strconv.Itoa(int(value))
}
return ""
}
func (u *User) SetProperties(p map[string]interface{}) {
u.mutex.Lock()
u.Properties = p
u.mutex.Unlock()
}
func (u *User) DeleteProperty(key string) {
u.mutex.Lock()
delete(u.Properties, key)
u.mutex.Unlock()
}
func (u *User) DeleteProperties(features []string) {
u.mutex.Lock()
for _, key := range features {
delete(u.Properties, key)
}
u.mutex.Unlock()
}
func (u *User) IntProperty(key string) (int, error) {
u.mutex.RLock()
defer u.mutex.RUnlock()
val, ok := u.Properties[key]
if !ok {
return int(0), errors.New("property key not exist")
}
switch value := val.(type) {
case float64:
return int(value), nil
case int:
return value, nil
case uint:
return int(value), nil
case int32:
return int(value), nil
case int64:
return int(value), nil
case string:
return strconv.Atoi(value)
default:
return int(0), errors.New("unsupported type")
}
}
func (u *User) GetProperty(key string) interface{} {
u.mutex.RLock()
defer u.mutex.RUnlock()
val, ok := u.Properties[key]
if !ok {
return nil
}
return val
}
func (u *User) AddCacheFeatures(key string, features map[string]any) {
u.mutex.Lock()
defer u.mutex.Unlock()
m, ok := u.cacheFeatures[key]
if !ok {
m = make(map[string]any, len(features))
}
for k, v := range features {
m[k] = v
}
u.cacheFeatures[key] = m
}
func (u *User) LoadCacheFeatures(key string) {
u.mutex.Lock()
defer u.mutex.Unlock()
if m, ok := u.cacheFeatures[key]; ok {
for k, v := range m {
u.Properties[k] = v
}
}
}
func (u *User) GetCacheFeatures(key string) (result map[string]interface{}) {
u.mutex.RLock()
defer u.mutex.RUnlock()
result = make(map[string]interface{})
if m, ok := u.cacheFeatures[key]; ok {
for k, v := range m {
result[k] = v
}
}
return
}
func (u *User) IncrementFeatureAsyncLoadCount(count int32) {
atomic.AddInt32(&u.featureAsyncLoadCount, count)
}
func (u *User) DescFeatureAsyncLoadCount(count int32) {
u.mutex.Lock()
defer u.mutex.Unlock()
if atomic.LoadInt32(&u.featureAsyncLoadCount) < 1 {
panic("featureAsyncLoadCount not less than 0")
}
curr := atomic.AddInt32(&u.featureAsyncLoadCount, -1*count)
if curr == 0 {
if !u.featureAsyncLoadChClosed {
close(u.featureAsyncLoadCh)
u.featureAsyncLoadChClosed = true
}
}
}
func (u *User) FeatureAsyncLoadCount() int32 {
return atomic.LoadInt32(&u.featureAsyncLoadCount)
}
func (u *User) GetCacheFeaturesNames() (ret []string) {
u.mutex.RLock()
defer u.mutex.RUnlock()
for k := range u.cacheFeatures {
ret = append(ret, k)
}
return
}
func (u *User) FeatureAsyncLoadCh() <-chan struct{} {
return u.featureAsyncLoadCh
}