protected JavaPairRDD ApplyPhotoFilter()

in spark/common/src/main/java/org/apache/sedona/viz/core/VisualizationOperator.java [313:524]


  protected JavaPairRDD<Pixel, Double> ApplyPhotoFilter(JavaSparkContext sparkContext)
      throws Exception {
    logger.info("[Sedona-VizViz][ApplyPhotoFilter][Start]");
    if (this.parallelPhotoFilter) {
      if (!this.hasBeenSpatialPartitioned) {
        this.spatialPartitioningWithDuplicates();
        this.hasBeenSpatialPartitioned = true;
      }

      this.distributedRasterCountMatrix =
          this.distributedRasterCountMatrix.mapPartitionsToPair(
              new PairFlatMapFunction<Iterator<Tuple2<Pixel, Double>>, Pixel, Double>() {

                @Override
                public Iterator<Tuple2<Pixel, Double>> call(
                    Iterator<Tuple2<Pixel, Double>> currentPartition) throws Exception {
                  // This function uses an efficient algorithm to recompute the pixel value. For
                  // each existing pixel,
                  // we calculate its impact for all pixels within its impact range and add its
                  // impact values.
                  HashMap<Pixel, Double> pixelCountHashMap = new HashMap<Pixel, Double>();
                  while (currentPartition.hasNext()) {
                    Tuple2<Pixel, Double> currentPixelCount = currentPartition.next();
                    Tuple2<Integer, Integer> centerPixelCoordinate =
                        new Tuple2<Integer, Integer>(
                            (int) currentPixelCount._1().getX(),
                            (int) currentPixelCount._1().getY());
                    if (centerPixelCoordinate._1() < 0
                        || centerPixelCoordinate._1() >= resolutionX
                        || centerPixelCoordinate._2() < 0
                        || centerPixelCoordinate._2() >= resolutionY) {
                      // This center pixel is out of boundary so that we don't record its sum. We
                      // don't plot this pixel on the final sub image.
                      continue;
                    }
                    // Find all pixels that are in the working pixel's impact range
                    for (int x = -photoFilterRadius; x <= photoFilterRadius; x++) {
                      for (int y = -photoFilterRadius; y <= photoFilterRadius; y++) {
                        int neighborPixelX = centerPixelCoordinate._1 + x;
                        int neighborPixelY = centerPixelCoordinate._2 + y;
                        if (currentPixelCount._1().getCurrentPartitionId() < 0) {
                          throw new Exception(
                              "[VisualizationOperator][ApplyPhotoFilter] this pixel doesn't have currentPartitionId that is assigned in VisualizationPartitioner.");
                        }
                        if (neighborPixelX < 0
                            || neighborPixelX >= resolutionX
                            || neighborPixelY < 0
                            || neighborPixelY >= resolutionY) {
                          // This neighbor pixel is out of boundary so that we don't record its sum.
                          // We don't plot this pixel on the final sub image.
                          continue;
                        }
                        if (VisualizationPartitioner.CalculatePartitionId(
                                resolutionX,
                                resolutionY,
                                partitionX,
                                partitionY,
                                neighborPixelX,
                                neighborPixelY)
                            != currentPixelCount._1().getCurrentPartitionId()) {
                          // This neighbor pixel is not in this image partition so that we don't
                          // record its sum. We don't plot this pixel on the final sub image.
                          continue;
                        }
                        Double neighborPixelCount =
                            pixelCountHashMap.get(
                                new Pixel(
                                    neighborPixelX, neighborPixelY, resolutionX, resolutionY));
                        // For that pixel, sum up its new count
                        if (neighborPixelCount != null) {
                          neighborPixelCount +=
                              currentPixelCount._2()
                                  * PhotoFilterConvolutionMatrix[x + photoFilterRadius][
                                      y + photoFilterRadius];
                          pixelCountHashMap.remove(
                              new Pixel(neighborPixelX, neighborPixelY, resolutionX, resolutionY));
                          Pixel newPixel =
                              new Pixel(
                                  neighborPixelX,
                                  neighborPixelY,
                                  resolutionX,
                                  resolutionY,
                                  false,
                                  VisualizationPartitioner.CalculatePartitionId(
                                      resolutionX,
                                      resolutionY,
                                      partitionX,
                                      partitionY,
                                      neighborPixelX,
                                      neighborPixelY));
                          pixelCountHashMap.put(newPixel, neighborPixelCount);
                        } else {
                          neighborPixelCount =
                              currentPixelCount._2()
                                  * PhotoFilterConvolutionMatrix[x + photoFilterRadius][
                                      y + photoFilterRadius];
                          Pixel newPixel =
                              new Pixel(
                                  neighborPixelX,
                                  neighborPixelY,
                                  resolutionX,
                                  resolutionY,
                                  false,
                                  VisualizationPartitioner.CalculatePartitionId(
                                      resolutionX,
                                      resolutionY,
                                      partitionX,
                                      partitionY,
                                      neighborPixelX,
                                      neighborPixelY));
                          pixelCountHashMap.put(newPixel, neighborPixelCount);
                        }
                      }
                    }
                  }
                  // Loop over the result map and convert the map. This is not efficient and can be
                  // replaced by a better way.
                  List<Tuple2<Pixel, Double>> resultSet = new ArrayList<Tuple2<Pixel, Double>>();
                  Iterator<java.util.Map.Entry<Pixel, Double>> hashmapIterator =
                      pixelCountHashMap.entrySet().iterator();
                  while (hashmapIterator.hasNext()) {
                    Map.Entry<Pixel, Double> cursorEntry = hashmapIterator.next();
                    resultSet.add(
                        new Tuple2<Pixel, Double>(cursorEntry.getKey(), cursorEntry.getValue()));
                  }
                  return resultSet.iterator();
                }
              });
    } else {
      /*
      List<Tuple2<Pixel,Double>> collectedCountMatrix = this.distributedRasterCountMatrix.collect();
      HashMap<Pixel,Double> hashCountMatrix = new HashMap<Pixel,Double>();
      for(Tuple2<Pixel,Double> pixelCount:collectedCountMatrix)
      {
          hashCountMatrix.put(pixelCount._1(),pixelCount._2());
      }
      //List<Tuple2<Pixel,Double>> originalCountMatrix = new ArrayList<Tuple2<Pixel,Double>>(collectedCountMatrix);
      //Collections.sort(originalCountMatrix, new PixelSerialIdComparator());
      final Broadcast<HashMap<Pixel,Double>> broadcastCountMatrix = sparkContext.broadcast(hashCountMatrix);
      this.distributedRasterCountMatrix = this.distributedRasterCountMatrix.mapToPair(new PairFunction<Tuple2<Pixel,Double>,Pixel,Double>()
      {
          @Override
          public Tuple2<Pixel, Double> call(Tuple2<Pixel, Double> pixel) throws Exception {
              Tuple2<Integer,Integer> centerPixelCoordinate = new Tuple2<Integer, Integer>(pixel._1().getX(),pixel._1().getY());
              Double pixelCount = new Double(0.0);
              for (int x = -photoFilterRadius; x <= photoFilterRadius; x++) {
                  for (int y = -photoFilterRadius; y <= photoFilterRadius; y++) {
                      int neighborPixelX = centerPixelCoordinate._1+x;
                      int neighborPixelY = centerPixelCoordinate._2+y;
                      if(neighborPixelX<resolutionX&&neighborPixelX>=0&&neighborPixelY<resolutionY&&neighborPixelY>=0)
                      {
                          Double neighborPixelCount = broadcastCountMatrix.getValue().get(new Pixel(neighborPixelX,neighborPixelY,resolutionX,resolutionY));
                          if (neighborPixelCount==null)
                          {
                              // Some of its neighbors are blank.
                              neighborPixelCount=0.0;
                          }
                          pixelCount+=neighborPixelCount*PhotoFilterConvolutionMatrix[x + photoFilterRadius][y + photoFilterRadius];
                      }
                  }
              }
              return new Tuple2<Pixel,Double>(pixel._1,pixelCount);
          }
      });
      */
      // Spread the impact of this pixel
      this.distributedRasterCountMatrix =
          this.distributedRasterCountMatrix.flatMapToPair(
              new PairFlatMapFunction<Tuple2<Pixel, Double>, Pixel, Double>() {
                @Override
                public Iterator<Tuple2<Pixel, Double>> call(Tuple2<Pixel, Double> pixelCount)
                    throws Exception {
                  Tuple2<Integer, Integer> centerPixelCoordinate =
                      new Tuple2<Integer, Integer>(
                          (int) pixelCount._1().getX(), (int) pixelCount._1().getY());
                  List<Tuple2<Pixel, Double>> result = new ArrayList<Tuple2<Pixel, Double>>();
                  for (int x = -photoFilterRadius; x <= photoFilterRadius; x++) {
                    for (int y = -photoFilterRadius; y <= photoFilterRadius; y++) {
                      int neighborPixelX = centerPixelCoordinate._1 + x;
                      int neighborPixelY = centerPixelCoordinate._2 + y;
                      Double pixelCountValue = 0.0;
                      if (neighborPixelX >= resolutionX
                          || neighborPixelX < 0
                          || neighborPixelY >= resolutionY
                          || neighborPixelY < 0) {
                        continue;
                      }
                      pixelCountValue =
                          pixelCount._2()
                              * PhotoFilterConvolutionMatrix[x + photoFilterRadius][
                                  y + photoFilterRadius];
                      result.add(
                          new Tuple2<Pixel, Double>(
                              new Pixel(neighborPixelX, neighborPixelY, resolutionX, resolutionY),
                              pixelCountValue));
                    }
                  }
                  return result.iterator();
                }
              });
      this.distributedRasterCountMatrix =
          this.distributedRasterCountMatrix.reduceByKey(
              new Function2<Double, Double, Double>() {
                @Override
                public Double call(Double count1, Double count2) throws Exception {
                  return count1 + count2;
                }
              });
    }
    logger.info("[Sedona-VizViz][ApplyPhotoFilter][Stop]");
    return this.distributedRasterCountMatrix;
  }