in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java [75:117]
public void open(String plcConnectionString, Map<String, String> tags) throws PlcConnectionException, Exception {
connection = connectionManager.getConnection(plcConnectionString);
if (!connection.getMetadata().isSubscribeSupported()) {
throw new PlcProtocolException("This connection does not support subscription");
}
PlcSubscriptionRequest.Builder builder = connection.subscriptionRequestBuilder();
for (Map.Entry<String, String> entry : tags.entrySet()) {
switch (subscriptionType) {
case CHANGE:
builder.addChangeOfStateTagAddress(entry.getKey(), entry.getValue());
break;
case CYCLIC:
builder.addCyclicTagAddress(entry.getKey(), entry.getValue(), Duration.ofMillis(cyclingPollingInterval));
break;
case EVENT:
builder.addEventTagAddress(entry.getKey(), entry.getValue());
}
}
PlcSubscriptionRequest subscriptionRequest = builder.build();
PlcSubscriptionResponse subscriptionResponse;
try {
subscriptionResponse = subscriptionRequest.execute().get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error("InterruptedException reading the data from PLC", e);
throw e;
} catch (TimeoutException e) {
logger.error("Timeout connection to PLC", e);
throw e;
} catch (Exception e) {
logger.error("Exception reading the data from PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
for (PlcSubscriptionHandle handle : subscriptionResponse.getSubscriptionHandles()) {
handle.register(queuedEvents::offer);
}
running = true;
}