internal/dal/catalog.go (142 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package dal import ( "context" "fmt" "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/constants" "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/dal/model" "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/util" "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/pkg/asyncapi" v2 "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/pkg/asyncapi/v2" "github.com/gogf/gf/util/gconv" "gorm.io/gorm" "time" ) const maxSize = 100 type CatalogDAL interface { Select(ctx context.Context, eventID int) (*model.Event, error) SelectList(ctx context.Context, page int, size int) ([]model.Event, int, error) Save(ctx context.Context, record *model.Event) error SelectOperations(ctx context.Context, serviceName string, operationID string) ([]*model.EventCatalog, error) } func NewCatalogDAL() CatalogDAL { var w catalogDALImpl return &w } type catalogDALImpl struct { } func (c *catalogDALImpl) Select(ctx context.Context, eventID int) (*model.Event, error) { var condition = model.Event{ID: eventID} var r model.Event if err := catalogDB.WithContext(ctx).Where(&condition).First(&r).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, nil } return nil, err } return &r, nil } func (c *catalogDALImpl) SelectList(ctx context.Context, page int, size int) ([]model.Event, int, error) { var r []model.Event db := catalogDB.WithContext(ctx).Where("1=1") db = db.Where("status = ?", constants.NormalStatus) if size > maxSize { size = maxSize } if page == 0 { page = 1 } var count int64 db = db.Limit(size).Offset(size * (page - 1)).Order("update_time DESC") if err := db.Find(&r).Limit(-1).Offset(-1).Count(&count).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, 0, nil } return nil, 0, err } return r, int(count), nil } func (c *catalogDALImpl) SelectOperations(ctx context.Context, serviceName string, operationID string) ([]*model.EventCatalog, error) { var res []*model.EventCatalog var condition = model.EventCatalog{ServiceName: serviceName, OperationID: operationID} if err := catalogDB.WithContext(ctx).Where(&condition).Find(&res).Error; err != nil { return nil, err } return res, nil } func (c *catalogDALImpl) Save(ctx context.Context, record *model.Event) error { return catalogDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { if record.ID > 0 { if err := c.delete(tx, record); err != nil { return err } } return c.create(tx, record) }) } func (c *catalogDALImpl) insert(tx *gorm.DB, record *model.EventCatalog) error { return tx.Create(record).Error } func (c *catalogDALImpl) insertEvent(tx *gorm.DB, record *model.Event) error { return tx.Create(record).Error } func (c *catalogDALImpl) delete(tx *gorm.DB, record *model.Event) error { var handlers []func() error handlers = append(handlers, func() error { cond := model.Event{Status: constants.InvalidStatus, UpdateTime: time.Now()} return tx.Where("id = ?", record.ID).Updates(&cond).Error }, func() error { cond := model.EventCatalog{Status: constants.InvalidStatus, UpdateTime: time.Now()} return tx.Where("service_name = ?", record.Title).Updates(&cond).Error }) return util.GoAndWait(handlers...) } func (c *catalogDALImpl) create(tx *gorm.DB, record *model.Event) error { if len(record.Definition) == 0 { return nil } var doc = new(v2.Document) if err := v2.Decode(gconv.Bytes(record.Definition), &doc); err != nil { return err } if len(doc.Channels()) == 0 { return nil } var handlers []func() error handlers = append(handlers, func() error { return c.insertEvent(tx, c.buildEvent(record.Definition, doc)) }) for _, channel := range doc.Channels() { for _, operation := range channel.Operations() { var eventCatalog = c.buildEventCatalog(doc.Info().Title(), channel, operation) handlers = append(handlers, func() error { return c.insert(tx, eventCatalog) }) } } return util.GoAndWait(handlers...) } func (c *catalogDALImpl) buildEventCatalog(serviceName string, channel asyncapi.Channel, operation asyncapi.Operation) *model.EventCatalog { var record model.EventCatalog record.ServiceName = serviceName record.OperationID = fmt.Sprintf("file://%s.yaml#%s", serviceName, operation.ID()) record.ChannelName = channel.ID() record.Type = string(operation.Type()) record.Status = constants.NormalStatus record.CreateTime = time.Now() record.UpdateTime = time.Now() record.Schema = gconv.String(operation.Messages()[0].Payload()) return &record } func (c *catalogDALImpl) buildEvent(definition string, doc asyncapi.Document) *model.Event { var record model.Event record.Title = doc.Info().Title() record.FileName = fmt.Sprintf("%s.yaml", record.Title) record.Definition = definition record.Status = constants.NormalStatus record.Version = doc.Info().Version() record.CreateTime = time.Now() record.UpdateTime = time.Now() return &record }