protected Block processRows()

in athena-udfs-textanalytics/src/main/java/com/amazonaws/athena/udf/textanalytics/TextAnalyticsUDFHandler.java [1258:1299]


    protected Block processRows(BlockAllocator allocator, Method udfMethod, Block inputRecords, Schema outputSchema)
            throws Exception
    {
        int rowCount = inputRecords.getRowCount();
        System.out.println("DEBUG: inputRecords rowCount = " + rowCount);
        int fieldCount = inputRecords.getFieldReaders().size();
        System.out.println("DEBUG: inputRecords fieldCount = " + fieldCount);

        String[][] input = new String[fieldCount][rowCount];
        for (int fieldNum = 0; fieldNum < fieldCount; ++fieldNum) {
            for (int rowNum = 0; rowNum < rowCount; ++rowNum) {
                input[fieldNum][rowNum] = this.getStringValue(inputRecords, fieldNum, rowNum);
                //System.out.println("FIELD " + fieldNum + " /ROW " + rowNum + " VALUE: " + input[fieldNum][rowNum]);
            }
        }
        // input and output arrays serialised to JSON strings, to match the method signature declared in the UDF.
        String[] inputjson = new String[fieldCount];
        for (int fieldNum = 0; fieldNum < fieldCount; ++fieldNum) {
            inputjson[fieldNum] = toJSON(input[fieldNum]);  
        }
        // now call the udf with the right number of arguments, per fieldCount
        String resultjson;
        switch (fieldCount) {
            case 1: resultjson = (String) udfMethod.invoke(this, inputjson[0]);
                    break;
            case 2: resultjson = (String) udfMethod.invoke(this, inputjson[0], inputjson[1]);
                    break;
            case 3: resultjson = (String) udfMethod.invoke(this, inputjson[0], inputjson[1], inputjson[2]);
                    break;
            case 4: resultjson = (String) udfMethod.invoke(this, inputjson[0], inputjson[1], inputjson[2], inputjson[3]);
                    break;
            default: throw new RuntimeException("Error: invalid argument count - " + fieldCount);
        }
        String[] result = fromJSON(resultjson);
        Field outputField = outputSchema.getFields().get(0);
        Block outputRecords = allocator.createBlock(outputSchema);
        outputRecords.setRowCount(rowCount);
        for (int rowNum = 0; rowNum < rowCount; ++rowNum) {
            outputRecords.setValue(outputField.getName(), rowNum, result[rowNum]);
        }        
        return outputRecords;
    }