in src/main/java/org/apache/flink/connector/rocketmq/catalog/RocketMQCatalog.java [245:272]
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
try {
TopicStatsTable topicStatsTable =
mqAdminExt.examineTopicStats(tablePath.getObjectName());
return topicStatsTable.getOffsetTable().keySet().stream()
.map(
topicOffset ->
new CatalogPartitionSpec(
new HashMap<String, String>(1) {
{
String queueId =
String.valueOf(
topicOffset.getQueueId());
put("__queue_id__", queueId);
}
}))
.collect(Collectors.toList());
} catch (Exception e) {
throw new CatalogException(
String.format(
"Failed to list partitions of table %s by defaultMQAdminExt.",
tablePath.getFullName()),
e);
}
}