internal/queue/in_memory_queue.go (73 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package queue import ( "context" "fmt" "github.com/apache/incubator-eventmesh/eventmesh-server-go/log" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/metrics" "github.com/gogf/gf/util/gconv" "github.com/reactivex/rxgo/v2" ) func init() { RegisterQueue(newInMemoryQueue()) } type inMemoryQueue struct { ch chan rxgo.Item observable rxgo.Observable workflowDAL dal.WorkflowDAL } func newInMemoryQueue() ObserveQueue { var p inMemoryQueue p.ch = make(chan rxgo.Item) p.observable = rxgo.FromChannel(p.ch) p.workflowDAL = dal.NewWorkflowDAL() return &p } // Name returns queue's name. func (q *inMemoryQueue) Name() string { return constants.QueueTypeInMemory } func (q *inMemoryQueue) Publish(tasks []*model.WorkflowTaskInstance) error { if len(tasks) == 0 { return nil } metrics.Add(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize), float64(len(tasks))) for _, t := range tasks { q.ch <- rxgo.Of(t) } return nil } func (q *inMemoryQueue) Ack(tasks *model.WorkflowTaskInstance) error { return nil } func (q *inMemoryQueue) Observe() { go func() { defer func() { if err := recover(); err != nil { log.Get(constants.LogQueue).Errorf("Observe error=%+v", err) } }() for item := range q.observable.Observe() { metrics.Dec(constants.MetricsTaskQueue, fmt.Sprintf("%s_%s", q.Name(), constants.MetricsQueueSize)) q.handle(item) } }() } func (q *inMemoryQueue) handle(item rxgo.Item) { v, ok := item.V.(*model.WorkflowTaskInstance) if !ok { return } log.Get(constants.LogQueue).Infof("handle=%s", gconv.String(v)) if v.ID != 0 { if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), v); err != nil { log.Get(constants.LogQueue).Errorf("Observe UpdateTaskInstance error=%v", err) } return } if err := q.workflowDAL.InsertTaskInstance(context.Background(), v); err != nil { log.Get(constants.LogQueue).Errorf("Observe InsertTaskInstance error=%v", err) } }