public PCollectionTuple expand()

in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/PythonTextTransformer.java [415:536]


    public PCollectionTuple expand(PCollection<FailsafeElement<T, String>> elements) {
      return elements.apply(
          "ProcessUdf",
          ParDo.of(
                  new DoFn<FailsafeElement<T, String>, FailsafeElement<T, String>>() {
                    private PythonRuntime pythonRuntime;
                    private Integer batchLimit;
                    private Integer batchCounter;
                    private HashMap<String, FailsafeElement<T, String>> futures;
                    private String processUUID;
                    private File manifestFile;
                    private BufferedWriter dataWriter;
                    private BoundedWindow window;

                    @Setup
                    public void setup()
                        throws IOException, NoSuchMethodException, InterruptedException {
                      String runtimeVersion = getPythonVersion();

                      if (fileSystemPath() != null && functionName() != null) {
                        LOG.info("getting runtime!");
                        pythonRuntime =
                            getPythonRuntime(fileSystemPath(), functionName(), runtimeVersion);
                        LOG.info("Build Python Env for version {}", runtimeVersion);

                        pythonRuntime.buildPythonExecutable(runtimeVersion);
                      } else {
                        LOG.warn(
                            "Not setting up a Python Mapper runtime, because "
                                + "fileSystemPath={} and functionName={}",
                            fileSystemPath(),
                            functionName());
                        return;
                      }
                    }

                    @StartBundle
                    public void startBundle(StartBundleContext context) throws IOException {
                      batchCounter = 0;
                      batchLimit = 1000;
                      processUUID = UUID.randomUUID().toString();
                      manifestFile =
                          File.createTempFile(String.format("manifest_%s", processUUID), null);
                      futures = new HashMap<String, FailsafeElement<T, String>>();
                      dataWriter =
                          new BufferedWriter(new FileWriter(manifestFile.getAbsolutePath()));
                      LOG.info("file is at {}", manifestFile.getAbsolutePath());

                      // initialize a temp file (non local) and a counter
                      // TODO: createFile function

                    }

                    @ProcessElement
                    public void processElement(ProcessContext context, BoundedWindow window)
                        throws IOException {
                      this.window = window;
                      FailsafeElement<T, String> element = context.element();
                      String payloadStr = element.getPayload();
                      String eventId = UUID.randomUUID().toString();
                      JSONObject originalPayload = new JSONObject(payloadStr);
                      JSONObject json = new JSONObject();
                      json.put("id", eventId);
                      json.put("event", originalPayload);
                      String wrappedPayload = json.toString();
                      futures.put(eventId, element);
                      // TODO: add a counter of sum of total bytes
                      // 1) add event and increase counter
                      // 2) if counter > X process all of the rows

                      dataWriter.write(wrappedPayload);
                      dataWriter.newLine();
                      dataWriter.flush();
                      batchCounter++;
                    }

                    // TODO: create an execute batch fn

                    @FinishBundle
                    public void finishBundle(FinishBundleContext context)
                        throws IOException, NoSuchMethodException, InterruptedException {
                      LOG.info("closing batch at {} events", batchCounter);
                      Integer retries = runtimeRetries();
                      List<String> results = new ArrayList<>();
                      LOG.info("executing the batch!!!");
                      results = pythonRuntime.invoke(manifestFile, retries);
                      LOG.info("processed {} number of records", results.size());
                      for (int iter = 0; iter < results.size(); iter++) {
                        String event = results.get(iter);
                        JSONObject json = new JSONObject(event);
                        String eventId = json.getString("id");
                        FailsafeElement<T, String> originalEvent = futures.get(eventId);

                        // FIX THIS
                        if (json.getString("status").equals("SUCCESS")) {
                          String transformedEvent = json.getJSONObject("event").toString();
                          context.output(
                              FailsafeElement.of(
                                  originalEvent.getOriginalPayload(),
                                  json.getJSONObject("event").toString()),
                              window.maxTimestamp(),
                              window);
                          successCounter.inc();
                        } else if (json.getString("status").equals("FAILED")) {
                          context.output(
                              originalEvent
                                  .setErrorMessage(json.getString("error_message"))
                                  .setStacktrace(json.getString("error_message")),
                              window.maxTimestamp(),
                              window);
                        } else {
                          LOG.info("Failed to emit an event");
                          LOG.info("event status was {}", json.getString("status"));
                        }
                      }
                      futures.clear();
                      dataWriter.close();
                      manifestFile.delete();
                    }
                  })
              .withOutputTags(successTag(), TupleTagList.of(failureTag())));
    }