in plugin/connector/standalone/producer.go [44:71]
func (p *Producer) Publish(ctx context.Context, event *ce.Event, callback *connector.SendCallback) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("callback function execute failed: %v", err)
}
}()
if p.IsClosed() {
err = errors.New("fail to publish message, producer has been closed")
return
}
message, err := p.broker.PutMessage(event.Subject(), event)
if err != nil {
callback.OnError(&connector.ErrorResult{
Topic: event.Subject(),
Err: err,
})
return
}
sendResult := connector.SendResult{
MessageId: strconv.FormatInt(message.GetOffset(), 10),
Topic: event.Subject(),
Err: nil,
}
callback.OnSuccess(&sendResult)
return
}