in athena-udfs-textanalytics/src/main/java/com/amazonaws/athena/udf/textanalytics/TextAnalyticsUDFHandler.java [1258:1299]
protected Block processRows(BlockAllocator allocator, Method udfMethod, Block inputRecords, Schema outputSchema)
throws Exception
{
int rowCount = inputRecords.getRowCount();
System.out.println("DEBUG: inputRecords rowCount = " + rowCount);
int fieldCount = inputRecords.getFieldReaders().size();
System.out.println("DEBUG: inputRecords fieldCount = " + fieldCount);
String[][] input = new String[fieldCount][rowCount];
for (int fieldNum = 0; fieldNum < fieldCount; ++fieldNum) {
for (int rowNum = 0; rowNum < rowCount; ++rowNum) {
input[fieldNum][rowNum] = this.getStringValue(inputRecords, fieldNum, rowNum);
//System.out.println("FIELD " + fieldNum + " /ROW " + rowNum + " VALUE: " + input[fieldNum][rowNum]);
}
}
// input and output arrays serialised to JSON strings, to match the method signature declared in the UDF.
String[] inputjson = new String[fieldCount];
for (int fieldNum = 0; fieldNum < fieldCount; ++fieldNum) {
inputjson[fieldNum] = toJSON(input[fieldNum]);
}
// now call the udf with the right number of arguments, per fieldCount
String resultjson;
switch (fieldCount) {
case 1: resultjson = (String) udfMethod.invoke(this, inputjson[0]);
break;
case 2: resultjson = (String) udfMethod.invoke(this, inputjson[0], inputjson[1]);
break;
case 3: resultjson = (String) udfMethod.invoke(this, inputjson[0], inputjson[1], inputjson[2]);
break;
case 4: resultjson = (String) udfMethod.invoke(this, inputjson[0], inputjson[1], inputjson[2], inputjson[3]);
break;
default: throw new RuntimeException("Error: invalid argument count - " + fieldCount);
}
String[] result = fromJSON(resultjson);
Field outputField = outputSchema.getFields().get(0);
Block outputRecords = allocator.createBlock(outputSchema);
outputRecords.setRowCount(rowCount);
for (int rowNum = 0; rowNum < rowCount; ++rowNum) {
outputRecords.setValue(outputField.getName(), rowNum, result[rowNum]);
}
return outputRecords;
}