public void open()

in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java [75:117]


    public void open(String plcConnectionString, Map<String, String> tags) throws PlcConnectionException, Exception {
        connection = connectionManager.getConnection(plcConnectionString);

        if (!connection.getMetadata().isSubscribeSupported()) {
            throw new PlcProtocolException("This connection does not support subscription");
        }

        PlcSubscriptionRequest.Builder builder = connection.subscriptionRequestBuilder();

        for (Map.Entry<String, String> entry : tags.entrySet()) {
            switch (subscriptionType) {
                case CHANGE:
                    builder.addChangeOfStateTagAddress(entry.getKey(), entry.getValue());
                    break;
                case CYCLIC:
                    builder.addCyclicTagAddress(entry.getKey(), entry.getValue(), Duration.ofMillis(cyclingPollingInterval));
                    break;
                case EVENT:
                    builder.addEventTagAddress(entry.getKey(), entry.getValue());
            }
        }
        PlcSubscriptionRequest subscriptionRequest = builder.build();
        PlcSubscriptionResponse subscriptionResponse;
        try {
            subscriptionResponse = subscriptionRequest.execute().get(timeout, TimeUnit.MILLISECONDS);
            
        } catch (InterruptedException e) {
            logger.error("InterruptedException reading the data from PLC", e);
            throw e;
        } catch (TimeoutException e) {
            logger.error("Timeout connection to PLC", e);
            throw e;
        } catch (Exception e) {
            logger.error("Exception reading the data from PLC", e);
            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
        }

        for (PlcSubscriptionHandle handle : subscriptionResponse.getSubscriptionHandles()) {
            handle.register(queuedEvents::offer);
        }

        running = true;
    }