public ConnectorSplitSource getSplits()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [95:146]


    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session,
                                          ConnectorTableLayoutHandle layout,
                                          ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingStrategy) {

        int numSplits = this.pulsarConnectorConfig.getTargetNumSplits();

        PulsarTableLayoutHandle layoutHandle = (PulsarTableLayoutHandle) layout;
        PulsarTableHandle tableHandle = layoutHandle.getTable();
        TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();

        String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
        TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
                tableHandle.getTableName());

        SchemaInfo schemaInfo;

        try {
            schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
                    String.format("%s/%s", namespace, tableHandle.getTableName()));
        } catch (PulsarAdminException e) {
            if (e.getStatusCode() == 401) {
                throw new PrestoException(QUERY_REJECTED,
                        String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
                                namespace, tableHandle.getTableName()));
            } else if (e.getStatusCode() == 404) {
                schemaInfo = PulsarSchemaHandlers.defaultSchema();
            } else {
                throw new RuntimeException("Failed to get pulsar topic schema for topic "
                        + String.format("%s/%s", namespace, tableHandle.getTableName())
                        + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
            }
        }

        Collection<PulsarSplit> splits;
        try {
            OffloadPolicies offloadPolicies = this.pulsarAdmin.namespaces()
                                                .getOffloadPolicies(topicName.getNamespace());
            if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
                splits = getSplitsNonPartitionedTopic(
                        numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies);
                log.debug("Splits for non-partitioned topic %s: %s", topicName, splits);
            } else {
                splits = getSplitsPartitionedTopic(
                        numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies);
                log.debug("Splits for partitioned topic %s: %s", topicName, splits);
            }
        } catch (Exception e) {
            log.error(e, "Failed to get splits");
            throw new RuntimeException(e);
        }
        return new FixedSplitSource(splits);
    }