in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/PythonTextTransformer.java [415:536]
public PCollectionTuple expand(PCollection<FailsafeElement<T, String>> elements) {
return elements.apply(
"ProcessUdf",
ParDo.of(
new DoFn<FailsafeElement<T, String>, FailsafeElement<T, String>>() {
private PythonRuntime pythonRuntime;
private Integer batchLimit;
private Integer batchCounter;
private HashMap<String, FailsafeElement<T, String>> futures;
private String processUUID;
private File manifestFile;
private BufferedWriter dataWriter;
private BoundedWindow window;
@Setup
public void setup()
throws IOException, NoSuchMethodException, InterruptedException {
String runtimeVersion = getPythonVersion();
if (fileSystemPath() != null && functionName() != null) {
LOG.info("getting runtime!");
pythonRuntime =
getPythonRuntime(fileSystemPath(), functionName(), runtimeVersion);
LOG.info("Build Python Env for version {}", runtimeVersion);
pythonRuntime.buildPythonExecutable(runtimeVersion);
} else {
LOG.warn(
"Not setting up a Python Mapper runtime, because "
+ "fileSystemPath={} and functionName={}",
fileSystemPath(),
functionName());
return;
}
}
@StartBundle
public void startBundle(StartBundleContext context) throws IOException {
batchCounter = 0;
batchLimit = 1000;
processUUID = UUID.randomUUID().toString();
manifestFile =
File.createTempFile(String.format("manifest_%s", processUUID), null);
futures = new HashMap<String, FailsafeElement<T, String>>();
dataWriter =
new BufferedWriter(new FileWriter(manifestFile.getAbsolutePath()));
LOG.info("file is at {}", manifestFile.getAbsolutePath());
// initialize a temp file (non local) and a counter
// TODO: createFile function
}
@ProcessElement
public void processElement(ProcessContext context, BoundedWindow window)
throws IOException {
this.window = window;
FailsafeElement<T, String> element = context.element();
String payloadStr = element.getPayload();
String eventId = UUID.randomUUID().toString();
JSONObject originalPayload = new JSONObject(payloadStr);
JSONObject json = new JSONObject();
json.put("id", eventId);
json.put("event", originalPayload);
String wrappedPayload = json.toString();
futures.put(eventId, element);
// TODO: add a counter of sum of total bytes
// 1) add event and increase counter
// 2) if counter > X process all of the rows
dataWriter.write(wrappedPayload);
dataWriter.newLine();
dataWriter.flush();
batchCounter++;
}
// TODO: create an execute batch fn
@FinishBundle
public void finishBundle(FinishBundleContext context)
throws IOException, NoSuchMethodException, InterruptedException {
LOG.info("closing batch at {} events", batchCounter);
Integer retries = runtimeRetries();
List<String> results = new ArrayList<>();
LOG.info("executing the batch!!!");
results = pythonRuntime.invoke(manifestFile, retries);
LOG.info("processed {} number of records", results.size());
for (int iter = 0; iter < results.size(); iter++) {
String event = results.get(iter);
JSONObject json = new JSONObject(event);
String eventId = json.getString("id");
FailsafeElement<T, String> originalEvent = futures.get(eventId);
// FIX THIS
if (json.getString("status").equals("SUCCESS")) {
String transformedEvent = json.getJSONObject("event").toString();
context.output(
FailsafeElement.of(
originalEvent.getOriginalPayload(),
json.getJSONObject("event").toString()),
window.maxTimestamp(),
window);
successCounter.inc();
} else if (json.getString("status").equals("FAILED")) {
context.output(
originalEvent
.setErrorMessage(json.getString("error_message"))
.setStacktrace(json.getString("error_message")),
window.maxTimestamp(),
window);
} else {
LOG.info("Failed to emit an event");
LOG.info("event status was {}", json.getString("status"));
}
}
futures.clear();
dataWriter.close();
manifestFile.delete();
}
})
.withOutputTags(successTag(), TupleTagList.of(failureTag())));
}