public void onTrigger()

in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java [102:203]


	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
		FlowFile fileToProcess = session.get();

        // Abort if there's nothing to do.
        if (fileToProcess == null) {
            return;
        }
			
		final ComponentLog logger = getLogger();

		// Get an instance of a component able to read from a PLC.
		final AtomicLong nrOfRows = new AtomicLong(0L);
		final StopWatch executeTime = new StopWatch(true);

		try {
			session.read(fileToProcess, in -> {
				Record record = null;
			
				try (RecordReader recordReader = context.getProperty(PLC_RECORD_READER_FACTORY)
					.asControllerService(RecordReaderFactory.class)
					.createRecordReader(fileToProcess, in, logger)){

					while ((record = recordReader.nextRecord()) != null) {
						AtomicLong nrOfRowsHere = new AtomicLong(0);
						PlcWriteRequest writeRequest;

						final Map<String,String> addressMap = getPlcAddressMap(context, fileToProcess);
						final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);

						try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, fileToProcess))) {
							
							writeRequest = getWriteRequest(logger, addressMap, tags, record.toMap(), connection, nrOfRowsHere);

							PlcWriteResponse plcWriteResponse = writeRequest.execute().get(getTimeout(context, fileToProcess), TimeUnit.MILLISECONDS);

							// Response check if values were written
							evaluateWriteResponse(logger, record.toMap(), plcWriteResponse);

							// Store all configured tags in cache if not present
							if (tags == null) {
								if (debugEnabled)
									logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);

								// Parse plc addresses
								Map<String, PlcTag> validAddressesPlcTags = new LinkedHashMap<>();
								for (Map.Entry<String, String> entry : addressMap.entrySet()) {
									Optional<PlcTag> newTag = connection.parseTagAddress(entry.getValue());
									newTag.ifPresent(parsed -> validAddressesPlcTags.put(entry.getKey(), parsed));
								}

								getSchemaCache().addSchema(
										addressMap,
										validAddressesPlcTags.keySet(),
										new ArrayList<>(validAddressesPlcTags.values()),
										null
								);
							}
						} catch (TimeoutException e) {
							logger.error("Timeout writting the data to the PLC", e);
							getConnectionManager().removeCachedConnection(getConnectionString(context, fileToProcess));
							throw new ProcessException(e);
						} catch (PlcConnectionException e) {
							logger.error("Error getting the PLC connection", e);
							throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
						} catch (Exception e) {
							logger.error("Exception writting the data to the PLC", e);
							throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
						}

						nrOfRows.getAndAdd(nrOfRowsHere.get());
					}
				} catch (Exception e) {
					throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
				} 
			});

		} catch (ProcessException e) {
			logger.error("Exception writing the data to the PLC", e);
			session.putAttribute(fileToProcess, EXCEPTION, e.getLocalizedMessage());
			session.transfer(fileToProcess, REL_FAILURE);
			session.commitAsync();
			throw e;
		} 


		long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
		final Map<String, String> attributesToAdd = new HashMap<>();
		attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
		attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
		attributesToAdd.put(INPUT_FLOWFILE_UUID, fileToProcess.getAttribute(CoreAttributes.UUID.key()));
		
		session.putAllAttributes(fileToProcess, attributesToAdd);

		session.transfer(fileToProcess, REL_SUCCESS);
		
		logger.info("Writing {} fields from {} records; transferring to 'success'", nrOfRows.get(), fileToProcess);
		if (context.hasIncomingConnection()) {
			session.getProvenanceReporter().fetch(fileToProcess, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
		} else {
			session.getProvenanceReporter().receive(fileToProcess, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
		}
	}