in spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java [232:326]
public void calc_partitioner(GridType gridType, int numPartitions) throws Exception {
if (numPartitions <= 0) {
throw new IllegalArgumentException("Number of partitions must be >= 0");
}
if (this.boundaryEnvelope == null) {
throw new Exception(
"[AbstractSpatialRDD][spatialPartitioning] SpatialRDD boundary is null. Please call analyze() first.");
}
if (this.approximateTotalCount == -1) {
throw new Exception(
"[AbstractSpatialRDD][spatialPartitioning] SpatialRDD total count is unknown. Please call analyze() first.");
}
// Calculate the number of samples we need to take.
int sampleNumberOfRecords =
RDDSampleUtils.getSampleNumbers(
numPartitions, this.approximateTotalCount, this.sampleNumber);
// Take Sample
// RDD.takeSample implementation tends to scan the data multiple times to gather the exact
// number of samples requested. Repeated scans increase the latency of the join. This increase
// is significant for large datasets.
// See
// https://github.com/apache/spark/blob/412b0e8969215411b97efd3d0984dc6cac5d31e0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L508
// Here, we choose to get samples faster over getting exactly specified number of samples.
final double fraction =
SamplingUtils.computeFractionForSampleSize(
sampleNumberOfRecords, approximateTotalCount, false);
List<Envelope> samples =
this.rawSpatialRDD
.sample(false, fraction)
.map(
new Function<T, Envelope>() {
@Override
public Envelope call(T geometry) throws Exception {
return geometry.getEnvelopeInternal();
}
})
.collect();
logger.info("Collected " + samples.size() + " samples");
// Add some padding at the top and right of the boundaryEnvelope to make
// sure all geometries lie within the half-open rectangle.
final Envelope paddedBoundary =
new Envelope(
boundaryEnvelope.getMinX(), boundaryEnvelope.getMaxX() + 0.01,
boundaryEnvelope.getMinY(), boundaryEnvelope.getMaxY() + 0.01);
switch (gridType) {
case EQUALGRID:
{
// Force the quad-tree to grow up to a certain level
// So the actual num of partitions might be slightly different
int minLevel = (int) Math.max(Math.log(numPartitions) / Math.log(4), 0);
QuadtreePartitioning quadtreePartitioning =
new QuadtreePartitioning(
new ArrayList<Envelope>(), paddedBoundary, numPartitions, minLevel);
StandardQuadTree tree = quadtreePartitioning.getPartitionTree();
partitioner = new QuadTreePartitioner(tree);
break;
}
case QUADTREE:
{
QuadtreePartitioning quadtreePartitioning =
new QuadtreePartitioning(samples, paddedBoundary, numPartitions);
StandardQuadTree tree = quadtreePartitioning.getPartitionTree();
partitioner = new QuadTreePartitioner(tree);
break;
}
case KDBTREE:
{
final KDB tree = new KDB(samples.size() / numPartitions, numPartitions, paddedBoundary);
for (final Envelope sample : samples) {
tree.insert(sample);
}
tree.assignLeafIds();
partitioner = new KDBTreePartitioner(tree);
break;
}
case QUADTREE_RTREE:
{
ExtendedQuadTree tree = new ExtendedQuadTree<>(paddedBoundary, numPartitions);
ExtendedQuadTree<Integer> extendedQuadTree = (ExtendedQuadTree<Integer>) tree;
extendedQuadTree.build(neighborSampleNumber);
partitioner = new QuadTreeRTPartitioner(extendedQuadTree);
break;
}
default:
throw new Exception(
"[AbstractSpatialRDD][spatialPartitioning] Unsupported spatial partitioning method. "
+ "The following partitioning methods are not longer supported: R-Tree, Hilbert curve, Voronoi");
}
}