in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java [51:116]
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile incomingFlowFile = null;
if (context.hasIncomingConnection()) {
incomingFlowFile = session.get();
if (incomingFlowFile == null && context.hasNonLoopConnection()) {
return;
}
}
final ComponentLog logger = getLogger();
final FlowFile flowFile = session.create();
try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, incomingFlowFile))) {
if (!connection.getMetadata().isReadSupported()) {
throw new ProcessException("Reading not supported by connection");
}
final Map<String,String> addressMap = getPlcAddressMap(context, incomingFlowFile);
final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
PlcReadRequest readRequest = getReadRequest(logger, addressMap, tags, connection);
try {
final PlcReadResponse response = readRequest.execute().get(getTimeout(context, incomingFlowFile), TimeUnit.MILLISECONDS);
evaluateReadResponse(session, flowFile, response);
} catch (TimeoutException e) {
logger.error("Timeout reading the data from PLC", e);
getConnectionManager().removeCachedConnection(getConnectionString(context, incomingFlowFile));
throw new ProcessException(e);
} catch (Exception e) {
logger.error("Exception reading the data from PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
if (incomingFlowFile != null) {
session.remove(incomingFlowFile);
}
session.transfer(flowFile, REL_SUCCESS);
if (tags == null){
if (debugEnabled)
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
getSchemaCache().addSchema(
addressMap,
readRequest.getTagNames(),
readRequest.getTags(),
null
);
}
} catch (Exception e) {
session.remove(flowFile);
if (incomingFlowFile != null){
incomingFlowFile = session.putAttribute(incomingFlowFile, EXCEPTION, e.getLocalizedMessage());
session.transfer(incomingFlowFile, REL_FAILURE);
}
session.commitAsync();
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
}