func()

in plugin/connector/standalone/producer.go [44:71]


func (p *Producer) Publish(ctx context.Context, event *ce.Event, callback *connector.SendCallback) (err error) {
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("callback function execute failed: %v", err)
		}
	}()

	if p.IsClosed() {
		err = errors.New("fail to publish message, producer has been closed")
		return
	}

	message, err := p.broker.PutMessage(event.Subject(), event)
	if err != nil {
		callback.OnError(&connector.ErrorResult{
			Topic: event.Subject(),
			Err:   err,
		})
		return
	}
	sendResult := connector.SendResult{
		MessageId: strconv.FormatInt(message.GetOffset(), 10),
		Topic:     event.Subject(),
		Err:       nil,
	}
	callback.OnSuccess(&sendResult)
	return
}