in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [95:146]
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session,
ConnectorTableLayoutHandle layout,
ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingStrategy) {
int numSplits = this.pulsarConnectorConfig.getTargetNumSplits();
PulsarTableLayoutHandle layoutHandle = (PulsarTableLayoutHandle) layout;
PulsarTableHandle tableHandle = layoutHandle.getTable();
TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
tableHandle.getTableName());
SchemaInfo schemaInfo;
try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
String.format("%s/%s", namespace, tableHandle.getTableName()));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
namespace, tableHandle.getTableName()));
} else if (e.getStatusCode() == 404) {
schemaInfo = PulsarSchemaHandlers.defaultSchema();
} else {
throw new RuntimeException("Failed to get pulsar topic schema for topic "
+ String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
Collection<PulsarSplit> splits;
try {
OffloadPolicies offloadPolicies = this.pulsarAdmin.namespaces()
.getOffloadPolicies(topicName.getNamespace());
if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
splits = getSplitsNonPartitionedTopic(
numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies);
log.debug("Splits for non-partitioned topic %s: %s", topicName, splits);
} else {
splits = getSplitsPartitionedTopic(
numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies);
log.debug("Splits for partitioned topic %s: %s", topicName, splits);
}
} catch (Exception e) {
log.error(e, "Failed to get splits");
throw new RuntimeException(e);
}
return new FixedSplitSource(splits);
}