plugin/connector/standalone/producer.go (80 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package standalone
import (
"context"
"errors"
"fmt"
"strconv"
"time"
ce "github.com/cloudevents/sdk-go/v2"
"go.uber.org/atomic"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
)
// Producer standalone producer
type Producer struct {
broker *Broker
started *atomic.Bool
}
func NewProducer() *Producer {
return &Producer{
broker: GetBroker(),
started: atomic.NewBool(false),
}
}
func (p *Producer) Publish(ctx context.Context, event *ce.Event, callback *connector.SendCallback) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("callback function execute failed: %v", err)
}
}()
if p.IsClosed() {
err = errors.New("fail to publish message, producer has been closed")
return
}
message, err := p.broker.PutMessage(event.Subject(), event)
if err != nil {
callback.OnError(&connector.ErrorResult{
Topic: event.Subject(),
Err: err,
})
return
}
sendResult := connector.SendResult{
MessageId: strconv.FormatInt(message.GetOffset(), 10),
Topic: event.Subject(),
Err: nil,
}
callback.OnSuccess(&sendResult)
return
}
func (p *Producer) SendOneway(ctx context.Context, event *ce.Event) (err error) {
_, err = p.broker.PutMessage(event.Subject(), event)
return
}
func (p *Producer) Request(ctx context.Context, event *ce.Event, callback *connector.RequestReplyCallback, timeout time.Duration) error {
return fmt.Errorf("request is not supported in standalone connector")
}
func (p *Producer) Reply(ctx context.Context, event *ce.Event, callback *connector.SendCallback) error {
return fmt.Errorf("reply is not supported in standalone connector")
}
func (p *Producer) CheckTopicExist(topicName string) (exist bool, err error) {
return p.broker.ExistTopic(topicName), nil
}
func (p *Producer) SetExtFields() error {
// No-Op for standalone producer
return nil
}
func (p *Producer) InitProducer(properties map[string]string) error {
// No-Op for standalone producer
return nil
}
func (p *Producer) Start() error {
p.started.CAS(false, true)
return nil
}
func (p *Producer) Shutdown() error {
p.started.CAS(true, false)
return nil
}
func (p *Producer) IsStarted() bool {
return p.started.Load()
}
func (p *Producer) IsClosed() bool {
return !p.started.Load()
}