public void onTrigger()

in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java [56:116]


    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        
        // Abort if there's nothing to do.
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();

        try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, flowFile))) {
            if (!connection.getMetadata().isWriteSupported()) {
                throw new ProcessException("Writing not supported by connection");
            }
            
            final Map<String,String> addressMap = getPlcAddressMap(context, flowFile);
            final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);

            PlcWriteRequest writeRequest = getWriteRequest(logger, addressMap, tags, flowFile.getAttributes(), connection, null);

            try {
                final PlcWriteResponse plcWriteResponse = writeRequest.execute().get(getTimeout(context, flowFile), TimeUnit.MILLISECONDS);

                evaluateWriteResponse(logger, flowFile.getAttributes(), plcWriteResponse);

                // Store all configured tags in cache if not present
                if (tags == null) {
                    if (debugEnabled)
                        logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);

                    // Parse plc addresses
                    Map<String, PlcTag> validAddressesPlcTags = new LinkedHashMap<>();
                    for (Map.Entry<String, String> entry : addressMap.entrySet()) {
                        Optional<PlcTag> newTag = connection.parseTagAddress(entry.getValue());
                        newTag.ifPresent(parsed -> validAddressesPlcTags.put(entry.getKey(), parsed));
                    }

                    getSchemaCache().addSchema(
                            addressMap,
                            validAddressesPlcTags.keySet(),
                            new ArrayList<>(validAddressesPlcTags.values()),
                            null
                    );
                }
            } catch (TimeoutException e) {
                logger.error("Timeout writing the data to the PLC", e);
                getConnectionManager().removeCachedConnection(getConnectionString(context, flowFile));
                throw new ProcessException(e);
            } catch (Exception e) {
                logger.error("Exception writing the data to the PLC", e);
                throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
            }

            session.transfer(flowFile, REL_SUCCESS);
        } catch (Exception e) {
            flowFile = session.putAttribute(flowFile, EXCEPTION, e.getLocalizedMessage());
            session.transfer(flowFile, REL_FAILURE);
            session.commitAsync();
            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
        }
    }