in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java [56:116]
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
// Abort if there's nothing to do.
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, flowFile))) {
if (!connection.getMetadata().isWriteSupported()) {
throw new ProcessException("Writing not supported by connection");
}
final Map<String,String> addressMap = getPlcAddressMap(context, flowFile);
final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
PlcWriteRequest writeRequest = getWriteRequest(logger, addressMap, tags, flowFile.getAttributes(), connection, null);
try {
final PlcWriteResponse plcWriteResponse = writeRequest.execute().get(getTimeout(context, flowFile), TimeUnit.MILLISECONDS);
evaluateWriteResponse(logger, flowFile.getAttributes(), plcWriteResponse);
// Store all configured tags in cache if not present
if (tags == null) {
if (debugEnabled)
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
// Parse plc addresses
Map<String, PlcTag> validAddressesPlcTags = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : addressMap.entrySet()) {
Optional<PlcTag> newTag = connection.parseTagAddress(entry.getValue());
newTag.ifPresent(parsed -> validAddressesPlcTags.put(entry.getKey(), parsed));
}
getSchemaCache().addSchema(
addressMap,
validAddressesPlcTags.keySet(),
new ArrayList<>(validAddressesPlcTags.values()),
null
);
}
} catch (TimeoutException e) {
logger.error("Timeout writing the data to the PLC", e);
getConnectionManager().removeCachedConnection(getConnectionString(context, flowFile));
throw new ProcessException(e);
} catch (Exception e) {
logger.error("Exception writing the data to the PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
flowFile = session.putAttribute(flowFile, EXCEPTION, e.getLocalizedMessage());
session.transfer(flowFile, REL_FAILURE);
session.commitAsync();
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
}