in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java [170:204]
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName) {
ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
if (schemaName.isPresent()) {
String schemaNameOrNull = schemaName.get();
if (schemaNameOrNull.equals(INFORMATION_SCHEMA)) {
// no-op for now but add pulsar connector specific system tables here
} else {
List<String> pulsarTopicList = null;
try {
pulsarTopicList = this.pulsarAdmin.topics()
.getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
return builder.build();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull));
}
throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
if (pulsarTopicList != null) {
pulsarTopicList.stream()
.map(topic -> TopicName.get(topic).getPartitionedTopicName())
.distinct()
.forEach(topic -> builder.add(new SchemaTableName(schemaNameOrNull,
TopicName.get(topic).getLocalName())));
}
}
}
return builder.build();
}