datasource/kafka/kafka_log.go (21 lines of code) (raw):
package kafka
import (
"fmt"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/service/hook"
)
type FeatureLogKafkaFunc func(*KafkaProducer, *module.User, []*module.Item, *context.RecommendContext)
func FeatureLogToKafka(kafkaName string, f FeatureLogKafkaFunc) {
producer, err := GetKafkaProducer(kafkaName)
if err != nil {
panic(fmt.Sprintf("get kafka producer error, :%v", err))
}
hook.AddRecommendCleanHook(func(producer *KafkaProducer, f FeatureLogKafkaFunc) hook.RecommendCleanHookFunc {
return func(context *context.RecommendContext, params ...interface{}) {
user := params[0].(*module.User)
items := params[1].([]*module.Item)
f(producer, user, items, context)
}
}(producer, f))
}