func()

in dao/feature_view_hologres_dao.go [346:510]


func (d *FeatureViewHologresDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
	selector := make([]string, 0, len(selectFields))
	for _, field := range selectFields {
		selector = append(selector, fmt.Sprintf("\"%s\"", field))
	}
	currTime := time.Now().Unix()
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	onlineFunc := func(userId interface{}) []map[string]interface{} {
		builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
		builder.Select(selector...)
		builder.From(d.onlineTable)
		where := []string{builder.Equal(fmt.Sprintf("\"%s\"", d.primaryKeyField), userId),
			builder.GreaterThan(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField), currTime-86400*5)}
		if len(events) > 0 {
			where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), events...))
		}
		builder.Where(where...)
		builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()
		sql, args := builder.Build()
		stmtKey := crc32.ChecksumIEEE([]byte(sql))
		stmt := d.getStmt(stmtKey)
		if stmt == nil {
			d.mu.Lock()
			stmt = d.stmtMap[stmtKey]
			if stmt == nil {
				stmt2, err := d.db.Prepare(sql)
				if err != nil {
					d.mu.Unlock()
					log.Println(err)
					return nil
				}
				d.stmtMap[stmtKey] = stmt2
				stmt = stmt2
				d.mu.Unlock()
			} else {
				d.mu.Unlock()
			}
		}
		rows, err := stmt.Query(args...)
		if err != nil {
			log.Println(err)
			return nil
		}
		defer rows.Close()
		columns, _ := rows.ColumnTypes()
		values := ColumnValues(columns)
		result := make([]map[string]interface{}, 0, len(userIds)*len(events)*50)

		for rows.Next() {
			if err := rows.Scan(values...); err == nil {
				properties := make(map[string]interface{}, len(values))
				for i, column := range columns {
					name := column.Name()
					if value := ParseColumnValues(values[i]); value != nil {
						properties[name] = value
					}
				}
				if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
					if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
						continue
					}
				}
				result = append(result, properties)
			}
		}
		return result
	}
	offlineFunc := func(userId interface{}) []map[string]interface{} {
		builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
		builder.Select(selector...)
		builder.From(d.offlineTable)
		where := []string{builder.Equal(fmt.Sprintf("\"%s\"", d.primaryKeyField), userId)}
		if len(events) > 0 {
			where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), events...))
		}
		builder.Where(where...)
		builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()
		sql, args := builder.Build()
		stmtKey := crc32.ChecksumIEEE([]byte(sql))
		stmt := d.getStmt(stmtKey)
		if stmt == nil {
			d.mu.Lock()
			stmt = d.stmtMap[stmtKey]
			if stmt == nil {
				stmt2, err := d.db.Prepare(sql)
				if err != nil {
					d.mu.Unlock()
					log.Println(err)
					return nil
				}
				d.stmtMap[stmtKey] = stmt2
				stmt = stmt2
				d.mu.Unlock()
			} else {
				d.mu.Unlock()
			}
		}
		rows, err := stmt.Query(args...)
		if err != nil {
			log.Println(err)
			return nil
		}
		defer rows.Close()
		columns, _ := rows.ColumnTypes()
		values := ColumnValues(columns)
		result := make([]map[string]interface{}, 0, len(userIds)*len(events)*50)

		for rows.Next() {
			if err := rows.Scan(values...); err == nil {
				properties := make(map[string]interface{}, len(values))
				for i, column := range columns {
					name := column.Name()
					if value := ParseColumnValues(values[i]); value != nil {
						properties[name] = value
					}
				}
				if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
					if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
						continue
					}
				}
				result = append(result, properties)
			}
		}
		return result
	}

	results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
	var outmu sync.Mutex
	var wg sync.WaitGroup
	for _, userId := range userIds {
		wg.Add(1)
		go func(userId interface{}) {
			defer wg.Done()
			var innerWg sync.WaitGroup
			var offlineResult []map[string]interface{}
			var onlineResult []map[string]interface{}
			// offline table
			innerWg.Add(1)
			go func(userId interface{}) {
				defer innerWg.Done()
				offlineResult = offlineFunc(userId)
			}(userId)
			// online table
			innerWg.Add(1)
			go func(userId interface{}) {
				defer innerWg.Done()
				onlineResult = onlineFunc(userId)
			}(userId)
			innerWg.Wait()
			if offlineResult == nil || onlineResult == nil {
				fmt.Println("get user behavior feature failed")
				return
			}
			combinedResult := combineBehaviorFeatures(offlineResult, onlineResult, sequenceConfig.TimestampField)
			outmu.Lock()
			results = append(results, combinedResult...)
			outmu.Unlock()
		}(userId)
	}
	wg.Wait()

	return results, nil
}