in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java [95:201]
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final ComponentLog logger = getLogger();
// Get an instance of a component able to read from a PLC.
final AtomicLong nrOfRows = new AtomicLong(0L);
final StopWatch executeTime = new StopWatch(true);
final FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
session.putAttribute(resultSetFF, INPUT_FLOWFILE_UUID, fileToProcess.getAttribute(CoreAttributes.UUID.key()));
}
final FlowFile originalFlowFile = fileToProcess;
Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
try {
session.write(resultSetFF, out -> {
final Map<String,String> addressMap = getPlcAddressMap(context, originalFlowFile);
final RecordSchema recordSchema = getSchemaCache().retrieveSchema(addressMap);
final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
PlcReadRequest readRequest;
long nrOfRowsHere;
try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, originalFlowFile))) {
readRequest = getReadRequest(logger, addressMap, tags, connection);
PlcReadResponse readResponse = readRequest.execute().get(getTimeout(context, originalFlowFile), TimeUnit.MILLISECONDS);
nrOfRowsHere = evaluateReadResponse(context, logger, originalFlowFile, plc4xWriter, out, recordSchema, readResponse);
} catch (TimeoutException e) {
logger.error("Timeout reading the data from PLC", e);
getConnectionManager().removeCachedConnection(getConnectionString(context, originalFlowFile));
throw new ProcessException(e);
} catch (PlcConnectionException e) {
logger.error("Error getting the PLC connection", e);
throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
} catch (Exception e) {
logger.error("Exception reading the data from PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
if (recordSchema == null){
if (debugEnabled)
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
getSchemaCache().addSchema(
addressMap,
readRequest.getTagNames(),
readRequest.getTags(),
plc4xWriter.getRecordSchema()
);
}
nrOfRows.set(nrOfRowsHere);
});
} catch (Exception e) {
logger.error("Exception reading the data from the PLC", e);
if (fileToProcess != null) {
session.putAttribute(fileToProcess, EXCEPTION, e.getLocalizedMessage());
session.transfer(fileToProcess, REL_FAILURE);
}
session.remove(resultSetFF);
session.commitAsync();
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
plc4xWriter.updateCounters(session);
long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
session.putAllAttributes(resultSetFF, attributesToAdd);
logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());
if (context.hasIncomingConnection()) {
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
} else {
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
}
if (fileToProcess != null) {
session.remove(fileToProcess);
}
session.transfer(resultSetFF, REL_SUCCESS);
}