private ConnectorTableMetadata getTableMetadata()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java [277:336]


    private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {

        if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
            return null;
        }
        String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);

        TopicName topicName = TopicName.get(
                String.format("%s/%s", namespace, schemaTableName.getTableName()));

        List<String> topics;
        try {
            if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
                topics = this.pulsarAdmin.topics().getList(namespace);
            } else {
                topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
            }
        } catch (PulsarAdminException e) {
            if (e.getStatusCode() == 404) {
                throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
            } else if (e.getStatusCode() == 401) {
                throw new PrestoException(QUERY_REJECTED,
                        String.format("Failed to get topics in schema %s: Unauthorized", namespace));
            }
            throw new RuntimeException("Failed to get topics in schema " + namespace
                    + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
        }

        if (!topics.contains(topicName.toString())) {
            log.error("Table %s not found",
                    String.format("%s/%s", namespace,
                            schemaTableName.getTableName()));
            throw new TableNotFoundException(schemaTableName);
        }

        SchemaInfo schemaInfo;
        try {
            schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
                    String.format("%s/%s", namespace, schemaTableName.getTableName()));
        } catch (PulsarAdminException e) {
            if (e.getStatusCode() == 404) {
                // use default schema because there is no schema
                schemaInfo = PulsarSchemaHandlers.defaultSchema();
            } else if (e.getStatusCode() == 401) {
                throw new PrestoException(QUERY_REJECTED,
                        String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
                                namespace, schemaTableName.getTableName()));
            } else {
                throw new RuntimeException("Failed to get pulsar topic schema information for topic "
                        + String.format("%s/%s", namespace, schemaTableName.getTableName())
                        + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
            }
        }
        List<ColumnMetadata> handles = getPulsarColumns(
                topicName, schemaInfo, withInternalColumns, PulsarColumnHandle.HandleKeyValueType.NONE
        );


        return new ConnectorTableMetadata(schemaTableName, handles);
    }