in spark/common/src/main/java/org/apache/sedona/viz/core/VisualizationOperator.java [313:524]
protected JavaPairRDD<Pixel, Double> ApplyPhotoFilter(JavaSparkContext sparkContext)
throws Exception {
logger.info("[Sedona-VizViz][ApplyPhotoFilter][Start]");
if (this.parallelPhotoFilter) {
if (!this.hasBeenSpatialPartitioned) {
this.spatialPartitioningWithDuplicates();
this.hasBeenSpatialPartitioned = true;
}
this.distributedRasterCountMatrix =
this.distributedRasterCountMatrix.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<Pixel, Double>>, Pixel, Double>() {
@Override
public Iterator<Tuple2<Pixel, Double>> call(
Iterator<Tuple2<Pixel, Double>> currentPartition) throws Exception {
// This function uses an efficient algorithm to recompute the pixel value. For
// each existing pixel,
// we calculate its impact for all pixels within its impact range and add its
// impact values.
HashMap<Pixel, Double> pixelCountHashMap = new HashMap<Pixel, Double>();
while (currentPartition.hasNext()) {
Tuple2<Pixel, Double> currentPixelCount = currentPartition.next();
Tuple2<Integer, Integer> centerPixelCoordinate =
new Tuple2<Integer, Integer>(
(int) currentPixelCount._1().getX(),
(int) currentPixelCount._1().getY());
if (centerPixelCoordinate._1() < 0
|| centerPixelCoordinate._1() >= resolutionX
|| centerPixelCoordinate._2() < 0
|| centerPixelCoordinate._2() >= resolutionY) {
// This center pixel is out of boundary so that we don't record its sum. We
// don't plot this pixel on the final sub image.
continue;
}
// Find all pixels that are in the working pixel's impact range
for (int x = -photoFilterRadius; x <= photoFilterRadius; x++) {
for (int y = -photoFilterRadius; y <= photoFilterRadius; y++) {
int neighborPixelX = centerPixelCoordinate._1 + x;
int neighborPixelY = centerPixelCoordinate._2 + y;
if (currentPixelCount._1().getCurrentPartitionId() < 0) {
throw new Exception(
"[VisualizationOperator][ApplyPhotoFilter] this pixel doesn't have currentPartitionId that is assigned in VisualizationPartitioner.");
}
if (neighborPixelX < 0
|| neighborPixelX >= resolutionX
|| neighborPixelY < 0
|| neighborPixelY >= resolutionY) {
// This neighbor pixel is out of boundary so that we don't record its sum.
// We don't plot this pixel on the final sub image.
continue;
}
if (VisualizationPartitioner.CalculatePartitionId(
resolutionX,
resolutionY,
partitionX,
partitionY,
neighborPixelX,
neighborPixelY)
!= currentPixelCount._1().getCurrentPartitionId()) {
// This neighbor pixel is not in this image partition so that we don't
// record its sum. We don't plot this pixel on the final sub image.
continue;
}
Double neighborPixelCount =
pixelCountHashMap.get(
new Pixel(
neighborPixelX, neighborPixelY, resolutionX, resolutionY));
// For that pixel, sum up its new count
if (neighborPixelCount != null) {
neighborPixelCount +=
currentPixelCount._2()
* PhotoFilterConvolutionMatrix[x + photoFilterRadius][
y + photoFilterRadius];
pixelCountHashMap.remove(
new Pixel(neighborPixelX, neighborPixelY, resolutionX, resolutionY));
Pixel newPixel =
new Pixel(
neighborPixelX,
neighborPixelY,
resolutionX,
resolutionY,
false,
VisualizationPartitioner.CalculatePartitionId(
resolutionX,
resolutionY,
partitionX,
partitionY,
neighborPixelX,
neighborPixelY));
pixelCountHashMap.put(newPixel, neighborPixelCount);
} else {
neighborPixelCount =
currentPixelCount._2()
* PhotoFilterConvolutionMatrix[x + photoFilterRadius][
y + photoFilterRadius];
Pixel newPixel =
new Pixel(
neighborPixelX,
neighborPixelY,
resolutionX,
resolutionY,
false,
VisualizationPartitioner.CalculatePartitionId(
resolutionX,
resolutionY,
partitionX,
partitionY,
neighborPixelX,
neighborPixelY));
pixelCountHashMap.put(newPixel, neighborPixelCount);
}
}
}
}
// Loop over the result map and convert the map. This is not efficient and can be
// replaced by a better way.
List<Tuple2<Pixel, Double>> resultSet = new ArrayList<Tuple2<Pixel, Double>>();
Iterator<java.util.Map.Entry<Pixel, Double>> hashmapIterator =
pixelCountHashMap.entrySet().iterator();
while (hashmapIterator.hasNext()) {
Map.Entry<Pixel, Double> cursorEntry = hashmapIterator.next();
resultSet.add(
new Tuple2<Pixel, Double>(cursorEntry.getKey(), cursorEntry.getValue()));
}
return resultSet.iterator();
}
});
} else {
/*
List<Tuple2<Pixel,Double>> collectedCountMatrix = this.distributedRasterCountMatrix.collect();
HashMap<Pixel,Double> hashCountMatrix = new HashMap<Pixel,Double>();
for(Tuple2<Pixel,Double> pixelCount:collectedCountMatrix)
{
hashCountMatrix.put(pixelCount._1(),pixelCount._2());
}
//List<Tuple2<Pixel,Double>> originalCountMatrix = new ArrayList<Tuple2<Pixel,Double>>(collectedCountMatrix);
//Collections.sort(originalCountMatrix, new PixelSerialIdComparator());
final Broadcast<HashMap<Pixel,Double>> broadcastCountMatrix = sparkContext.broadcast(hashCountMatrix);
this.distributedRasterCountMatrix = this.distributedRasterCountMatrix.mapToPair(new PairFunction<Tuple2<Pixel,Double>,Pixel,Double>()
{
@Override
public Tuple2<Pixel, Double> call(Tuple2<Pixel, Double> pixel) throws Exception {
Tuple2<Integer,Integer> centerPixelCoordinate = new Tuple2<Integer, Integer>(pixel._1().getX(),pixel._1().getY());
Double pixelCount = new Double(0.0);
for (int x = -photoFilterRadius; x <= photoFilterRadius; x++) {
for (int y = -photoFilterRadius; y <= photoFilterRadius; y++) {
int neighborPixelX = centerPixelCoordinate._1+x;
int neighborPixelY = centerPixelCoordinate._2+y;
if(neighborPixelX<resolutionX&&neighborPixelX>=0&&neighborPixelY<resolutionY&&neighborPixelY>=0)
{
Double neighborPixelCount = broadcastCountMatrix.getValue().get(new Pixel(neighborPixelX,neighborPixelY,resolutionX,resolutionY));
if (neighborPixelCount==null)
{
// Some of its neighbors are blank.
neighborPixelCount=0.0;
}
pixelCount+=neighborPixelCount*PhotoFilterConvolutionMatrix[x + photoFilterRadius][y + photoFilterRadius];
}
}
}
return new Tuple2<Pixel,Double>(pixel._1,pixelCount);
}
});
*/
// Spread the impact of this pixel
this.distributedRasterCountMatrix =
this.distributedRasterCountMatrix.flatMapToPair(
new PairFlatMapFunction<Tuple2<Pixel, Double>, Pixel, Double>() {
@Override
public Iterator<Tuple2<Pixel, Double>> call(Tuple2<Pixel, Double> pixelCount)
throws Exception {
Tuple2<Integer, Integer> centerPixelCoordinate =
new Tuple2<Integer, Integer>(
(int) pixelCount._1().getX(), (int) pixelCount._1().getY());
List<Tuple2<Pixel, Double>> result = new ArrayList<Tuple2<Pixel, Double>>();
for (int x = -photoFilterRadius; x <= photoFilterRadius; x++) {
for (int y = -photoFilterRadius; y <= photoFilterRadius; y++) {
int neighborPixelX = centerPixelCoordinate._1 + x;
int neighborPixelY = centerPixelCoordinate._2 + y;
Double pixelCountValue = 0.0;
if (neighborPixelX >= resolutionX
|| neighborPixelX < 0
|| neighborPixelY >= resolutionY
|| neighborPixelY < 0) {
continue;
}
pixelCountValue =
pixelCount._2()
* PhotoFilterConvolutionMatrix[x + photoFilterRadius][
y + photoFilterRadius];
result.add(
new Tuple2<Pixel, Double>(
new Pixel(neighborPixelX, neighborPixelY, resolutionX, resolutionY),
pixelCountValue));
}
}
return result.iterator();
}
});
this.distributedRasterCountMatrix =
this.distributedRasterCountMatrix.reduceByKey(
new Function2<Double, Double, Double>() {
@Override
public Double call(Double count1, Double count2) throws Exception {
return count1 + count2;
}
});
}
logger.info("[Sedona-VizViz][ApplyPhotoFilter][Stop]");
return this.distributedRasterCountMatrix;
}