public void start()

in plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java [107:238]


    public void start(Map<String, String> props) {
        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
        String connectionName = config.getString(Constants.CONNECTION_NAME_CONFIG);
        String plc4xConnectionString = config.getString(Constants.CONNECTION_STRING_CONFIG);
        pollReturnInterval = config.getInt(Constants.KAFKA_POLL_RETURN_CONFIG);
        Integer bufferSize = config.getInt(Constants.BUFFER_SIZE_CONFIG);

        Map<String, String> topics = new HashMap<>();
        // Create a buffer with a capacity of BUFFER_SIZE_CONFIG elements which schedules access in a fair way.
        buffer = new ArrayBlockingQueue<>(bufferSize, true);

        ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder();
        builder.addSource(connectionName, plc4xConnectionString);

        List<String> jobConfigs = config.getList(Constants.QUERIES_CONFIG);
        for (String jobConfig : jobConfigs) {
            String[] jobConfigSegments = jobConfig.split("\\|");
            if (jobConfigSegments.length < 4) {
                log.warn("Error in job configuration '{}'. " +
                    "The configuration expects at least 4 segments: " +
                    "{job-name}|{topic}|{rate}(|{tag-alias}#{tag-address})+", jobConfig);
                continue;
            }

            String jobName = jobConfigSegments[0];
            String topic = jobConfigSegments[1];
            Integer rate = Integer.valueOf(jobConfigSegments[2]);
            JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
                jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName);
            for (int i = 3; i < jobConfigSegments.length; i++) {
                String[] tagSegments = jobConfigSegments[i].split("#");
                if (tagSegments.length != 2) {
                    log.warn("Error in job configuration '{}'. " +
                            "The tag segment expects a format {tag-alias}#{tag-address}, but got '{}'",
                        jobName, jobConfigSegments[i]);
                    continue;
                }
                String tagAlias = tagSegments[0];
                String tagAddress = tagSegments[1];
                jobBuilder.tag(tagAlias, tagAddress);
                topics.put(jobName, topic);
            }
            jobBuilder.build();
        }

        ScraperConfigurationTriggeredImpl scraperConfig = builder.build();

        try {
            PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder().build();
            TriggerCollector triggerCollector = new TriggerCollectorImpl(connectionManager);
            scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
                try {
                    Long timestamp = System.currentTimeMillis();

                    Map<String, String> sourcePartition = new HashMap<>();
                    sourcePartition.put("sourceName", sourceName);
                    sourcePartition.put("jobName", jobName);

                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);

                    String topic = topics.get(jobName);

                    // Prepare the key structure.
                    Struct key = new Struct(KEY_SCHEMA)
                        .put(Constants.SOURCE_NAME_FIELD, sourceName)
                        .put(Constants.JOB_NAME_FIELD, jobName);

                    // Build the Schema for the result struct.
                    SchemaBuilder tagSchemaBuilder = SchemaBuilder.struct()
                        .name("org.apache.plc4x.kafka.schema.Tag");


                    for (Map.Entry<String, Object> result : results.entrySet()) {
                        // Get tag-name and -value from the results.
                        String tagName = result.getKey();
                        Object tagValue = result.getValue();

                        // Get the schema for the given value type.
                        Schema valueSchema = getSchema(tagValue);

                        // Add the schema description for the current tag.
                        tagSchemaBuilder.field(tagName, valueSchema);
                    }
                    Schema tagSchema = tagSchemaBuilder.build();

                    Schema recordSchema = SchemaBuilder.struct()
                        .name("org.apache.plc4x.kafka.schema.JobResult")
                        .doc("PLC Job result. This contains all of the received PLCValues as well as a received timestamp")
                        .field(Constants.TAGS_CONFIG, tagSchema)
                        .field(Constants.TIMESTAMP_CONFIG, Schema.INT64_SCHEMA)
                        .field(Constants.EXPIRES_CONFIG, Schema.OPTIONAL_INT64_SCHEMA)
                        .build();

                    // Build the struct itself.
                    Struct tagStruct = new Struct(tagSchema);
                    for (Map.Entry<String, Object> result : results.entrySet()) {
                        // Get tag-name and -value from the results.
                        String tagName = result.getKey();
                        Object tagValue = result.getValue();

                        if (tagSchema.field(tagName).schema().type() == Schema.Type.ARRAY) {
                            tagStruct.put(tagName, ((List) tagValue).stream().map(p -> ((PlcValue) p).getObject()).collect(Collectors.toList()));
                        } else {
                            tagStruct.put(tagName, tagValue);
                        }
                    }

                    Struct recordStruct = new Struct(recordSchema)
                        .put(Constants.TAGS_CONFIG, tagStruct)
                        .put(Constants.TIMESTAMP_CONFIG, timestamp);

                    // Prepare the source-record element.
                    SourceRecord sourceRecord = new SourceRecord(
                        sourcePartition, sourceOffset,
                        topic,
                        KEY_SCHEMA, key,
                        recordSchema, recordStruct
                    );

                    // Add the new source-record to the buffer.
                    buffer.add(sourceRecord);
                } catch (Exception e) {
                    log.error("Error while parsing returned values", e);
                }
            }, triggerCollector);
            scraper.start();
            triggerCollector.start();
        } catch (ScraperException e) {
            log.error("Error starting the scraper", e);

        }
    }