in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [68:110]
public InnerConsumerImpl(Configuration configuration) {
this.configuration = configuration;
this.commonExecutorService = buildExecutorService(configuration);
String accessKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_ACCESS_KEY);
String secretKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_SECRET_KEY);
// Note: sync pull thread num may not enough
if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
&& !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
AclClientRPCHook aclClientRpcHook =
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
this.adminExt = new DefaultMQAdminExt(aclClientRpcHook);
this.consumer = new DefaultLitePullConsumer(aclClientRpcHook);
} else {
this.adminExt = new DefaultMQAdminExt();
this.consumer = new DefaultLitePullConsumer();
}
String groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
String endPoints = configuration.getString(RocketMQSourceOptions.ENDPOINTS);
this.consumer.setNamesrvAddr(endPoints);
this.consumer.setConsumerGroup(groupId);
this.consumer.setAutoCommit(false);
this.consumer.setVipChannelEnabled(false);
this.consumer.setInstanceName(
String.join(
"#",
ManagementFactory.getRuntimeMXBean().getName(),
groupId,
UUID.randomUUID().toString()));
this.adminExt.setNamesrvAddr(endPoints);
this.adminExt.setAdminExtGroup(groupId);
this.adminExt.setVipChannelEnabled(false);
this.adminExt.setInstanceName(
String.join(
"#",
ManagementFactory.getRuntimeMXBean().getName(),
groupId,
UUID.randomUUID().toString()));
}