datasource/datahub/datahub.go (326 lines of code) (raw):

package datahub import ( "encoding/base64" "fmt" "sync" "sync/atomic" "time" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/module" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/service/hook" "github.com/alibaba/pairec/v2/utils/synclog" alidatahub "github.com/aliyun/aliyun-datahub-sdk-go/datahub" ) type Datahub struct { accessId string accessKey string endpoint string projectName string topicName string schemas []recconf.DatahubTopicSchema datahubApi alidatahub.DataHubApi shards []alidatahub.ShardEntry index uint64 recordSchema *alidatahub.RecordSchema active bool name string syncLog *synclog.SyncLog } var ( mu sync.RWMutex datahubInstances = make(map[string]*Datahub) ) func GetDatahub(name string) (*Datahub, error) { mu.RLock() defer mu.RUnlock() if _, ok := datahubInstances[name]; !ok { return nil, fmt.Errorf("Datahub not found, name:%s", name) } return datahubInstances[name], nil } func RegisterDatahub(name string, dh *Datahub) { mu.Lock() defer mu.Unlock() if _, ok := datahubInstances[name]; !ok { datahubInstances[name] = dh dh.name = name } } func RemoveDatahub(name string) { mu.Lock() defer mu.Unlock() if _, ok := datahubInstances[name]; ok { datahubInstances[name].StopLoopListShards() delete(datahubInstances, name) } } func NewDatahub(accessId, accessKey, endpoint, project, topic string, schemas []recconf.DatahubTopicSchema) *Datahub { p := &Datahub{ accessId: accessId, accessKey: accessKey, endpoint: endpoint, projectName: project, topicName: topic, index: 0, schemas: schemas, } return p } func (d *Datahub) Init() error { var account alidatahub.Account var err error if d.accessId == "" || d.accessKey == "" { account, err = NewAklessAccount() if err != nil { return err } } else { account = alidatahub.NewAliyunAccount(d.accessId, d.accessKey) } config := alidatahub.NewDefaultConfig() config.CompressorType = alidatahub.DEFLATE config.EnableBinary = false config.HttpClient = alidatahub.DefaultHttpClient() dh := alidatahub.NewClientWithConfig(d.endpoint, config, account) d.datahubApi = dh if len(d.schemas) > 0 { if err := d.createTopic(); err != nil { time.Sleep(2 * time.Second) if err = d.createTopic(); err != nil { return err } } } else { topic, err := dh.GetTopic(d.projectName, d.topicName) if err != nil { return err } d.recordSchema = topic.RecordSchema } d.active = true dir := fmt.Sprintf("./tmp/%s/%s", d.projectName, d.topicName) synclog := synclog.NewSyncLog(dir, d.consumeSyncLog) if err := synclog.Init(); err != nil { log.Error(fmt.Sprintf("project=%s\ttopic=%s\terror=init sync log error(%v)", d.projectName, d.topicName, err)) return err } d.syncLog = synclog go d.loopListShards() return nil } func (d *Datahub) createTopic() error { getTopicResult, err := d.datahubApi.GetTopic(d.projectName, d.topicName) if err != nil { recordSchema := alidatahub.NewRecordSchema() for _, schema := range d.schemas { switch schema.Type { case "string": recordSchema.AddField(alidatahub.Field{Name: schema.Field, Type: alidatahub.STRING, AllowNull: true}) case "integer": recordSchema.AddField(alidatahub.Field{Name: schema.Field, Type: alidatahub.INTEGER, AllowNull: true}) case "bigint": recordSchema.AddField(alidatahub.Field{Name: schema.Field, Type: alidatahub.BIGINT, AllowNull: true}) case "double": recordSchema.AddField(alidatahub.Field{Name: schema.Field, Type: alidatahub.DOUBLE, AllowNull: true}) case "float": recordSchema.AddField(alidatahub.Field{Name: schema.Field, Type: alidatahub.FLOAT, AllowNull: true}) case "timestamp": recordSchema.AddField(alidatahub.Field{Name: schema.Field, Type: alidatahub.TIMESTAMP, AllowNull: true}) } } if _, err := d.datahubApi.CreateTupleTopic(d.projectName, d.topicName, fmt.Sprintf("create topic %s", d.topicName), 3, 3, recordSchema); err != nil { return err } d.recordSchema = recordSchema } else { d.recordSchema = getTopicResult.RecordSchema } return nil } func (d *Datahub) DataHubApi() alidatahub.DataHubApi { return d.datahubApi } func (d *Datahub) Shards() (ret []string) { for _, shard := range d.shards { ret = append(ret, shard.ShardId) } return } func (d *Datahub) loopListShards() error { i := 0 for d.active { ls, err := d.datahubApi.ListShard(d.projectName, d.topicName) if err != nil { log.Error(fmt.Sprintf("project=%s\ttopic=%s\terror=get shard list failed(%v)", d.projectName, d.topicName, err)) i++ time.Sleep(time.Second * 10) if i >= 10 { d.Stop() } continue } var shards []alidatahub.ShardEntry for _, shard := range ls.Shards { if shard.State == alidatahub.ACTIVE { shards = append(shards, shard) } } if len(shards) > 0 { d.shards = shards } i = 0 time.Sleep(time.Minute) } return nil } func (d *Datahub) Stop() { d.StopLoopListShards() RemoveDatahub(d.name) } func (d *Datahub) StopLoopListShards() { d.active = false } func (d *Datahub) SendMessage(messages []map[string]interface{}) { records := make([]alidatahub.IRecord, 0, len(messages)) shards := d.shards for i := 0; i < 3; i++ { if len(shards) > 0 { break } shards = d.shards time.Sleep(time.Second) } if len(shards) == 0 { log.Error("topic shards empty") return } for _, messsage := range messages { i := atomic.AddUint64(&d.index, 1) shard := shards[(i)%uint64(len(shards))] record := alidatahub.NewTupleRecord(d.recordSchema, 0) record.ShardId = shard.ShardId for k, v := range messsage { record.SetValueByName(k, v) } records = append(records, record) } maxReTry := 3 retryNum := 0 retrySendMessage := func() { for _, msg := range messages { if err := d.syncLog.Write(NewSyncLogDatahubItem(msg)); err != nil { log.Error(fmt.Sprintf("project=%s\ttopic=%s\tmsg=write sync log failed(%v)", d.projectName, d.topicName, err)) } } } for retryNum < maxReTry { result, err := d.datahubApi.PutRecords(d.projectName, d.topicName, records) if err != nil { if _, ok := err.(*alidatahub.LimitExceededError); ok { retryNum++ time.Sleep(2 * time.Second) continue } else { log.Warning(fmt.Sprintf("project=%s\ttopic=%s\tmsg=put record failed(%v)", d.projectName, d.topicName, err)) retrySendMessage() return } } if len(result.FailedRecords) > 0 { log.Error(fmt.Sprintf("put successful num is %d, put records failed num is %d,msg=%s\tcode=%sproject=%s\ttopic=%s\n", len(records)-result.FailedRecordCount, result.FailedRecordCount, result.FailedRecords[0].ErrorMessage, result.FailedRecords[0].ErrorCode, d.projectName, d.topicName)) } break } if retryNum >= maxReTry { log.Warning(fmt.Sprintf("project=%s\ttopic=%s\tmsg=put record failed", d.projectName, d.topicName)) retrySendMessage() } } func (d *Datahub) consumeSyncLog(data []byte) error { datahubItem := NewSyncLogDatahubItem(nil) if err := datahubItem.Parse(data); err != nil { log.Error(fmt.Sprintf("parse datahub item failed(%v), data(%s), len:%d,project:%s, topic:%s", err, base64.StdEncoding.EncodeToString(data), len(data), d.projectName, d.topicName)) return nil } err := d.doSendSingleMessage(datahubItem.data) if err != nil { log.Error(fmt.Sprintf("project=%s\ttopic=%s\tmsg=put record failed(%v)", d.projectName, d.topicName, err)) } return nil } func (d *Datahub) doSendSingleMessage(message map[string]interface{}) error { records := make([]alidatahub.IRecord, 0, 1) shards := d.shards for i := 0; i < 3; i++ { if len(shards) > 0 { break } shards = d.shards time.Sleep(time.Second) } if len(shards) == 0 { return fmt.Errorf("topic shards empty") } i := atomic.AddUint64(&d.index, 1) shard := shards[(i)%uint64(len(shards))] record := alidatahub.NewTupleRecord(d.recordSchema, 0) record.ShardId = shard.ShardId for k, v := range message { record.SetValueByName(k, v) } records = append(records, record) maxReTry := 2 retryNum := 0 for retryNum < maxReTry { result, err := d.datahubApi.PutRecords(d.projectName, d.topicName, records) if err != nil { if _, ok := err.(*alidatahub.LimitExceededError); ok { log.Error("maybe qps exceed limit,retry") retryNum++ time.Sleep(2 * time.Second) continue } else { return err } } if len(result.FailedRecords) > 0 { retryNum++ continue } break } if retryNum >= maxReTry { return fmt.Errorf("put record failed") } return nil } func Load(config *recconf.RecommendConfig) { for name, conf := range config.DatahubConfs { if _, ok := datahubInstances[name]; ok { continue } m := NewDatahub(conf.AccessId, conf.AccessKey, conf.Endpoint, conf.ProjectName, conf.TopicName, conf.Schemas) err := m.Init() if err != nil { panic(err) } datahubInstances[name] = m } } type FeatureLogDatahubFunc func(*Datahub, *module.User, []*module.Item, *context.RecommendContext) func FeatureLogToDatahub(datahubName string, f FeatureLogDatahubFunc) { dh, err := GetDatahub(datahubName) if err != nil { panic(fmt.Sprintf("get datahub error, :%v", err)) } hook.AddRecommendCleanHook(func(datahub *Datahub, f FeatureLogDatahubFunc) hook.RecommendCleanHookFunc { return func(context *context.RecommendContext, params ...interface{}) { user := params[0].(*module.User) items := params[1].([]*module.Item) f(datahub, user, items, context) } }(dh, f)) }