public void onTrigger()

in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java [95:201]


	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
		
		FlowFile fileToProcess = null;
		if (context.hasIncomingConnection()) {
			fileToProcess = session.get();
			
			if (fileToProcess == null && context.hasNonLoopConnection()) {
				return;
			}
		}

		final ComponentLog logger = getLogger();
		
		
		// Get an instance of a component able to read from a PLC.
		final AtomicLong nrOfRows = new AtomicLong(0L);
		final StopWatch executeTime = new StopWatch(true);

		final FlowFile resultSetFF;
		if (fileToProcess == null) {
			resultSetFF = session.create();
		} else {
			resultSetFF = session.create(fileToProcess);
			session.putAttribute(resultSetFF, INPUT_FLOWFILE_UUID, fileToProcess.getAttribute(CoreAttributes.UUID.key()));
		}

		final FlowFile originalFlowFile = fileToProcess;

		Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), 
			fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());


		try {
			session.write(resultSetFF, out -> {
				final Map<String,String> addressMap = getPlcAddressMap(context, originalFlowFile);
				final RecordSchema recordSchema = getSchemaCache().retrieveSchema(addressMap);
				final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
				PlcReadRequest readRequest;
				long nrOfRowsHere;

				try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, originalFlowFile))) {
					
					readRequest =  getReadRequest(logger, addressMap, tags, connection);
					
					PlcReadResponse readResponse = readRequest.execute().get(getTimeout(context, originalFlowFile), TimeUnit.MILLISECONDS);
							
					nrOfRowsHere = evaluateReadResponse(context, logger, originalFlowFile, plc4xWriter, out, recordSchema, readResponse);

				} catch (TimeoutException e) {
					logger.error("Timeout reading the data from PLC", e);
					getConnectionManager().removeCachedConnection(getConnectionString(context, originalFlowFile));
					throw new ProcessException(e);
				} catch (PlcConnectionException e) {
					logger.error("Error getting the PLC connection", e);
					throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
				} catch (Exception e) {
					logger.error("Exception reading the data from PLC", e);
					throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
				}

				if (recordSchema == null){
					if (debugEnabled)
						logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
					getSchemaCache().addSchema(
						addressMap, 
						readRequest.getTagNames(),
						readRequest.getTags(),
						plc4xWriter.getRecordSchema()
					);
				}
				nrOfRows.set(nrOfRowsHere);

			});
			
		} catch (Exception e) {
			logger.error("Exception reading the data from the PLC", e);
			if (fileToProcess != null) {
				session.putAttribute(fileToProcess, EXCEPTION, e.getLocalizedMessage());
				session.transfer(fileToProcess, REL_FAILURE);
			}
			session.remove(resultSetFF);
			session.commitAsync();
			throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
		}

		plc4xWriter.updateCounters(session);
		long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
		final Map<String, String> attributesToAdd = new HashMap<>();
		attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
		attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
		attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());

		session.putAllAttributes(resultSetFF, attributesToAdd);
		
		logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());

		if (context.hasIncomingConnection()) {
			session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
		} else {
			session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
		}
		
		if (fileToProcess != null) {
			session.remove(fileToProcess);
		}
		session.transfer(resultSetFF, REL_SUCCESS);
	}