in rocketmq-logstash-integration/rocketmq-logstash-output/src/main/java/org/apache/rocketmq/logstashplugin/output/RocketMQ.java [92:124]
public RocketMQ(final String id, final Configuration configuration, final Context context) throws MQClientException {
this.id = id;
this.namesrvAddr = configuration.get(CONFIG_NAMESRV_ADDR);
this.topic = configuration.get(CONFIG_TOPIC);
this.tag = configuration.get(CONFIG_TAG);
this.batchSize = configuration.get(CONFIG_BATCH_SIZE);
this.sendTimeoutMillis = configuration.get(CONFIG_TIMEOUT);
this.retries = configuration.get(CONFIG_RETRY_TIMES);
this.sendMode = configuration.get(CONFIG_SEND_MODE);
this.group = configuration.get(CONFIG_GROUP);
this.codec = configuration.get(CONFIG_CODEC);
producer = new DefaultMQProducer(this.group);
producer.setNamesrvAddr(this.namesrvAddr);
if (null != this.retries) {
try {
final int retryTimes = Math.toIntExact(this.retries);
producer.setRetryTimesWhenSendFailed(retryTimes);
producer.setRetryTimesWhenSendAsyncFailed(retryTimes);
} catch (Exception e) {
}
}
try {
producer.start();
} catch (MQClientException e) {
logger.error("start producer failed", e);
throw e;
}
logger.warn("producer started. {}", producer);
}