v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/utils/FirestoreConverters.java [479:532]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public PCollectionTuple expand(PCollection<Entity> entities) {
      return entities
          .apply(
              "ExposeKeys",
              ParDo.of(
                  new DoFn<Entity, KV<byte[], Entity>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws IOException {
                      Entity e = c.element();

                      // Serialize Key deterministically
                      ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
                      CodedOutputStream output = CodedOutputStream.newInstance(byteOutputStream);
                      output.useDeterministicSerialization();
                      c.element().getKey().writeTo(output);
                      output.flush();

                      c.output(KV.of(byteOutputStream.toByteArray(), e));
                    }
                  }))
          .apply(GroupByKey.create())
          .apply(
              "ErrorOnDuplicateKeys",
              ParDo.of(
                      new DoFn<KV<byte[], Iterable<Entity>>, Entity>() {
                        private EntityJsonPrinter entityJsonPrinter;

                        @Setup
                        public void setup() {
                          entityJsonPrinter = new EntityJsonPrinter();
                        }

                        @ProcessElement
                        public void processElement(ProcessContext c) throws IOException {
                          Iterator<Entity> entities = c.element().getValue().iterator();
                          Entity entity = entities.next();
                          if (entities.hasNext()) {
                            do {
                              duplicatedKeys.inc();
                              ErrorMessage errorMessage =
                                  ErrorMessage.newBuilder()
                                      .setMessage("Duplicate Datastore Key")
                                      .setData(entityJsonPrinter.print(entity))
                                      .build();
                              c.output(errorTag(), errorMessage.toJson());
                              entity = entities.hasNext() ? entities.next() : null;
                            } while (entity != null);
                          } else {
                            c.output(entity);
                          }
                        }
                      })
                  .withOutputTags(goodTag(), TupleTagList.of(errorTag())));
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



v1/src/main/java/com/google/cloud/teleport/templates/common/DatastoreConverters.java [462:515]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public PCollectionTuple expand(PCollection<Entity> entities) {
      return entities
          .apply(
              "ExposeKeys",
              ParDo.of(
                  new DoFn<Entity, KV<byte[], Entity>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws IOException {
                      Entity e = c.element();

                      // Serialize Key deterministically
                      ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
                      CodedOutputStream output = CodedOutputStream.newInstance(byteOutputStream);
                      output.useDeterministicSerialization();
                      c.element().getKey().writeTo(output);
                      output.flush();

                      c.output(KV.of(byteOutputStream.toByteArray(), e));
                    }
                  }))
          .apply(GroupByKey.create())
          .apply(
              "ErrorOnDuplicateKeys",
              ParDo.of(
                      new DoFn<KV<byte[], Iterable<Entity>>, Entity>() {
                        private EntityJsonPrinter entityJsonPrinter;

                        @Setup
                        public void setup() {
                          entityJsonPrinter = new EntityJsonPrinter();
                        }

                        @ProcessElement
                        public void processElement(ProcessContext c) throws IOException {
                          Iterator<Entity> entities = c.element().getValue().iterator();
                          Entity entity = entities.next();
                          if (entities.hasNext()) {
                            do {
                              duplicatedKeys.inc();
                              ErrorMessage errorMessage =
                                  ErrorMessage.newBuilder()
                                      .setMessage("Duplicate Datastore Key")
                                      .setData(entityJsonPrinter.print(entity))
                                      .build();
                              c.output(errorTag(), errorMessage.toJson());
                              entity = entities.hasNext() ? entities.next() : null;
                            } while (entity != null);
                          } else {
                            c.output(entity);
                          }
                        }
                      })
                  .withOutputTags(goodTag(), TupleTagList.of(errorTag())));
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



