service/debug/debug_service.go (475 lines of code) (raw):
package debug
import (
"encoding/json"
"fmt"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/datasource/datahub"
"github.com/alibaba/pairec/v2/datasource/kafka"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"math/rand"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
)
type LogOutputer interface {
WriteLog(log map[string]interface{})
}
type ConsoleOutput struct {
}
func (t *ConsoleOutput) WriteLog(log map[string]interface{}) {
j, _ := json.Marshal(log)
fmt.Println(string(j))
}
type DatahubOutput struct {
datahub *datahub.Datahub
}
func NewDatahubOutput(config *recconf.DebugConfig) *DatahubOutput {
datahubclient, err := datahub.GetDatahub(config.DatahubName)
hub := DatahubOutput{}
if err != nil {
fmt.Println(err)
} else {
hub.datahub = datahubclient
}
return &hub
}
func (t *DatahubOutput) WriteLog(log map[string]interface{}) {
t.datahub.SendMessage([]map[string]interface{}{log})
}
type KafkaOutput struct {
kafka *kafka.KafkaProducer
}
func NewKafkaOutput(config *recconf.DebugConfig) *KafkaOutput {
kafkaclient, err := kafka.GetKafkaProducer(config.KafKaName)
hub := KafkaOutput{}
if err != nil {
fmt.Println(err)
} else {
hub.kafka = kafkaclient
}
return &hub
}
func (k *KafkaOutput) WriteLog(logs map[string]interface{}) {
module, _ := json.Marshal(logs["module"])
messages, _ := json.Marshal(logs)
k.kafka.SendMessages(module, messages)
}
var fileOutputMux sync.Mutex
type FileOutput struct {
path string
maxFileNum int
}
func NewFileOutput(config *recconf.DebugConfig) *FileOutput {
file := FileOutput{}
file.path = config.FilePath
filepath.Clean(file.path)
if !strings.HasSuffix(file.path, "/") {
file.path += "/"
}
if config.MaxFileNum > 0 {
file.maxFileNum = config.MaxFileNum
} else {
file.maxFileNum = 20
}
return &file
}
func (t *FileOutput) WriteLog(log map[string]interface{}) {
logData, _ := json.MarshalIndent(log, "", " ")
logData = append(logData, '\n')
err := os.MkdirAll(t.path, 0755)
if err != nil {
fmt.Println(err)
return
}
f, err := t.getCurrFile()
if err != nil {
fmt.Println(err)
return
}
fileOutputMux.Lock()
defer fileOutputMux.Unlock()
defer f.Close()
_, err = f.Write(logData)
if err != nil {
fmt.Println(err)
}
}
func (t *FileOutput) getCurrFile() (*os.File, error) {
files, err := t.getSortedFiles()
if err != nil {
return nil, err
}
var f *os.File
if len(files) == 0 {
f, err = t.newFile()
if err != nil {
return nil, err
}
} else {
f, err = os.OpenFile(filepath.Join(t.path, files[len(files)-1].Name()), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
fileInfo, err := f.Stat()
if err != nil {
return nil, err
}
if fileInfo.Size() > 1024*1024*1024 {
f.Close()
if len(files) >= t.maxFileNum {
for i := 0; i < len(files)-t.maxFileNum+1; i++ {
err := os.Remove(filepath.Join(t.path, files[i].Name()))
if err != nil {
return nil, err
}
}
}
f, err = t.newFile()
if err != nil {
return nil, err
}
} else {
if len(files) > t.maxFileNum {
for i := 0; i < len(files)-t.maxFileNum; i++ {
err := os.Remove(filepath.Join(t.path, files[i].Name()))
if err != nil {
return nil, err
}
}
}
}
}
return f, nil
}
func (t *FileOutput) newFile() (*os.File, error) {
datetime := time.Now().Format("20060102150405")
f, err := os.OpenFile(t.path+"pairec.debug."+datetime+".log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
return f, nil
}
func (t *FileOutput) getSortedFiles() ([]os.FileInfo, error) {
files := []os.FileInfo{}
err := filepath.Walk(t.path, func(path string, info os.FileInfo, err error) error {
if info.IsDir() && path != t.path {
return filepath.SkipDir
}
if !info.IsDir() {
if strings.HasPrefix(info.Name(), "pairec.debug.") && strings.HasSuffix(info.Name(), ".log") {
files = append(files, info)
}
}
return nil
})
if err != nil {
return nil, err
}
sort.Slice(files, func(i, j int) bool {
return t.getFileTimestamp(files[i]) < t.getFileTimestamp(files[j])
})
return files, nil
}
func (t *FileOutput) getFileTimestamp(file os.FileInfo) int64 {
timestampStr := strings.TrimSuffix(strings.TrimPrefix(file.Name(), "pairec.debug."), ".log")
timestamp, _ := strconv.ParseInt(timestampStr, 10, 64)
return timestamp
}
type EmptyOutput struct {
}
func (t *EmptyOutput) WriteLog(log map[string]interface{}) {
}
type DebugService struct {
logFlag bool
logOutputer LogOutputer
requestTime int64
}
func NewDebugService(user *module.User, context *context.RecommendContext) *DebugService {
service := DebugService{
logFlag: false,
}
var debugConfig recconf.DebugConfig
found := false
if context.ExperimentResult != nil {
data := context.ExperimentResult.GetExperimentParams().GetString("debugConfig", "")
if data != "" {
if err := json.Unmarshal([]byte(data), &debugConfig); err == nil {
found = true
}
}
}
if !found {
scene := context.GetParameter("scene").(string)
if config, ok := context.Config.DebugConfs[scene]; ok {
found = true
debugConfig = config
}
}
if !found {
// not found debug config, not set logflag
return &service
}
if debugConfig.Rate == 100 {
service.logFlag = true
} else {
if len(debugConfig.DebugUsers) > 0 {
for _, uid := range debugConfig.DebugUsers {
if uid == string(user.Id) {
service.logFlag = true
break
}
}
}
if !service.logFlag {
if rand.Intn(100) < debugConfig.Rate {
service.logFlag = true
}
}
}
if service.logFlag {
service.requestTime = time.Now().Unix()
service.init(&debugConfig)
}
return &service
}
func (d *DebugService) init(config *recconf.DebugConfig) {
switch config.OutputType {
case "console":
d.logOutputer = new(ConsoleOutput)
case "datahub":
d.logOutputer = NewDatahubOutput(config)
case "file":
d.logOutputer = NewFileOutput(config)
case "kafka":
d.logOutputer = NewKafkaOutput(config)
default:
d.logOutputer = new(EmptyOutput)
}
}
func (d *DebugService) WriteRecallLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
if d.logFlag {
triggerMap := make(map[module.ItemId]string, len(items))
newItems := make([]*module.Item, 0, len(items))
for _, item := range items {
triggerMap[item.Id] = item.StringProperty("trigger_id")
newItem := module.NewItem(string(item.Id))
newItem.RetrieveId = item.RetrieveId
newItem.Score = item.Score
newItems = append(newItems, newItem)
}
go d.doWriteRecallLog(user, newItems, context, triggerMap)
}
}
func (d *DebugService) WriteFilterLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
if d.logFlag {
go d.doWriteFilterLog(user, items, context)
}
}
func (d *DebugService) WriteGeneralLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
if d.logFlag {
newItems := make([]*module.Item, len(items))
copy(newItems, items)
go d.doWriteGeneralLog(user, newItems, context)
}
}
func (d *DebugService) WriteRankLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
if d.logFlag {
newItems := make([]*module.Item, 0, len(items))
for _, item := range items {
newItems = append(newItems, item.DeepClone())
}
go d.doWriteRankLog(user, newItems, context)
}
}
func (d *DebugService) WriteSortLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
if d.logFlag {
go d.doWriteSortLog(user, items, context)
}
}
func (d *DebugService) WriteRecommendLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
if d.logFlag {
go d.doWriteRecommendLog(user, items, context)
}
}
func (d *DebugService) doWriteRecallLog(user *module.User, items []*module.Item, context *context.RecommendContext, triggerMap map[module.ItemId]string) {
log := make(map[string]interface{})
log["request_id"] = context.RecommendId
log["module"] = "recall"
log["scene_id"] = context.GetParameter("scene")
if context.ExperimentResult != nil {
log["exp_id"] = context.ExperimentResult.GetExpId()
}
log["request_time"] = d.requestTime
log["uid"] = string(user.Id)
var itemLogInfos []string
itemsMap := make(map[string][]*module.Item)
for _, item := range items {
itemsMap[item.GetRecallName()] = append(itemsMap[item.GetRecallName()], item)
}
for name, itemList := range itemsMap {
log["retrieveid"] = name
for _, item := range itemList {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f:%s", item.Id, item.Score, triggerMap[item.Id]))
}
log["items"] = strings.Join(itemLogInfos, ",")
d.logOutputer.WriteLog(log)
itemLogInfos = itemLogInfos[:0]
}
}
func (d *DebugService) doWriteFilterLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
log := make(map[string]interface{})
log["request_id"] = context.RecommendId
log["module"] = "filter"
log["scene_id"] = context.GetParameter("scene")
if context.ExperimentResult != nil {
log["exp_id"] = context.ExperimentResult.GetExpId()
}
log["request_time"] = d.requestTime
log["uid"] = string(user.Id)
var itemLogInfos []string
itemsMap := make(map[string][]*module.Item)
for _, item := range items {
itemsMap[item.GetRecallName()] = append(itemsMap[item.GetRecallName()], item)
}
for name, itemList := range itemsMap {
log["retrieveid"] = name
for _, item := range itemList {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f", item.Id, item.Score))
}
log["items"] = strings.Join(itemLogInfos, ",")
d.logOutputer.WriteLog(log)
itemLogInfos = itemLogInfos[:0]
}
}
func (d *DebugService) doWriteGeneralLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error(fmt.Sprintf("error=%v, stack=%s", err, strings.ReplaceAll(stack, "\n", "\t")))
}
}()
logItems := make([]*module.Item, len(items))
copy(logItems, items)
log := make(map[string]interface{})
log["request_id"] = context.RecommendId
log["module"] = "general_rank"
log["scene_id"] = context.GetParameter("scene")
if context.ExperimentResult != nil {
log["exp_id"] = context.ExperimentResult.GetExpId()
}
log["request_time"] = d.requestTime
log["uid"] = string(user.Id)
var itemLogInfos []string
itemsMap := make(map[string][]*module.Item)
for _, item := range logItems {
itemsMap[item.GetRecallName()] = append(itemsMap[item.GetRecallName()], item)
}
for name, itemList := range itemsMap {
log["retrieveid"] = name
for _, item := range itemList {
if item != nil {
if b, err := json.Marshal(item.CloneAlgoScores()); err == nil {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f:%s", item.Id, item.Score, string(b)))
} else {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f", item.Id, item.Score))
}
}
}
log["items"] = strings.Join(itemLogInfos, ",")
d.logOutputer.WriteLog(log)
itemLogInfos = itemLogInfos[:0]
}
}
func (d *DebugService) doWriteRankLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
log := make(map[string]interface{})
log["request_id"] = context.RecommendId
log["module"] = "rank"
log["scene_id"] = context.GetParameter("scene")
if context.ExperimentResult != nil {
log["exp_id"] = context.ExperimentResult.GetExpId()
}
log["request_time"] = d.requestTime
log["uid"] = string(user.Id)
var itemLogInfos []string
itemsMap := make(map[string][]*module.Item)
//gosort.Sort(gosort.Reverse(sort.ItemScoreSlice(items)))
for _, item := range items {
itemsMap[item.GetRecallName()] = append(itemsMap[item.GetRecallName()], item)
}
for name, itemList := range itemsMap {
log["retrieveid"] = name
for _, item := range itemList {
if b, err := json.Marshal(item.CloneAlgoScores()); err == nil {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f:%s", item.Id, item.Score, string(b)))
} else {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f", item.Id, item.Score))
}
}
log["items"] = strings.Join(itemLogInfos, ",")
d.logOutputer.WriteLog(log)
itemLogInfos = itemLogInfos[:0]
}
}
func (d *DebugService) doWriteSortLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
log := make(map[string]interface{})
log["request_id"] = context.RecommendId
log["module"] = "sort"
log["scene_id"] = context.GetParameter("scene")
if context.ExperimentResult != nil {
log["exp_id"] = context.ExperimentResult.GetExpId()
}
log["request_time"] = d.requestTime
log["uid"] = string(user.Id)
var itemLogInfos []string
itemsMap := make(map[string][]*module.Item)
for _, item := range items {
itemsMap[item.GetRecallName()] = append(itemsMap[item.GetRecallName()], item)
}
for name, itemList := range itemsMap {
log["retrieveid"] = name
for _, item := range itemList {
if b, err := json.Marshal(item.CloneAlgoScores()); err == nil {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f:%s", item.Id, item.Score, string(b)))
} else {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f", item.Id, item.Score))
}
}
log["items"] = strings.Join(itemLogInfos, ",")
d.logOutputer.WriteLog(log)
itemLogInfos = itemLogInfos[:0]
}
}
func (d *DebugService) doWriteRecommendLog(user *module.User, items []*module.Item, context *context.RecommendContext) {
log := make(map[string]interface{})
log["request_id"] = context.RecommendId
log["module"] = "recommend"
log["scene_id"] = context.GetParameter("scene")
if context.ExperimentResult != nil {
log["exp_id"] = context.ExperimentResult.GetExpId()
}
log["request_time"] = d.requestTime
log["uid"] = string(user.Id)
var itemLogInfos []string
itemsMap := make(map[string][]*module.Item)
for _, item := range items {
itemsMap[item.GetRecallName()] = append(itemsMap[item.GetRecallName()], item)
}
for name, itemList := range itemsMap {
log["retrieveid"] = name
for _, item := range itemList {
if b, err := json.Marshal(item.CloneAlgoScores()); err == nil {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f:%s", item.Id, item.Score, string(b)))
} else {
itemLogInfos = append(itemLogInfos, fmt.Sprintf("%s:%f", item.Id, item.Score))
}
}
log["items"] = strings.Join(itemLogInfos, ",")
d.logOutputer.WriteLog(log)
itemLogInfos = itemLogInfos[:0]
}
}