in plugin/connector/rocketmq/factory.go [39:72]
func (f *Factory) Setup(name string, dec plugin.Decoder) error {
if dec == nil {
return errors.New(" producer config decoder empty")
}
properties := make(map[string]string)
if err := dec.Decode(properties); err != nil {
return err
}
f.properties = properties
consumer := NewConsumer()
err := consumer.InitConsumer(f.properties)
if err != nil {
return err
}
err = consumer.Start()
if err != nil {
return err
}
f.consumer = consumer
producer := NewProducer()
err = producer.InitProducer(f.properties)
if err != nil {
return err
}
err = producer.Start()
if err != nil {
return err
}
f.producer = producer
return nil
}