in plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java [107:238]
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
String connectionName = config.getString(Constants.CONNECTION_NAME_CONFIG);
String plc4xConnectionString = config.getString(Constants.CONNECTION_STRING_CONFIG);
pollReturnInterval = config.getInt(Constants.KAFKA_POLL_RETURN_CONFIG);
Integer bufferSize = config.getInt(Constants.BUFFER_SIZE_CONFIG);
Map<String, String> topics = new HashMap<>();
// Create a buffer with a capacity of BUFFER_SIZE_CONFIG elements which schedules access in a fair way.
buffer = new ArrayBlockingQueue<>(bufferSize, true);
ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder();
builder.addSource(connectionName, plc4xConnectionString);
List<String> jobConfigs = config.getList(Constants.QUERIES_CONFIG);
for (String jobConfig : jobConfigs) {
String[] jobConfigSegments = jobConfig.split("\\|");
if (jobConfigSegments.length < 4) {
log.warn("Error in job configuration '{}'. " +
"The configuration expects at least 4 segments: " +
"{job-name}|{topic}|{rate}(|{tag-alias}#{tag-address})+", jobConfig);
continue;
}
String jobName = jobConfigSegments[0];
String topic = jobConfigSegments[1];
Integer rate = Integer.valueOf(jobConfigSegments[2]);
JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName);
for (int i = 3; i < jobConfigSegments.length; i++) {
String[] tagSegments = jobConfigSegments[i].split("#");
if (tagSegments.length != 2) {
log.warn("Error in job configuration '{}'. " +
"The tag segment expects a format {tag-alias}#{tag-address}, but got '{}'",
jobName, jobConfigSegments[i]);
continue;
}
String tagAlias = tagSegments[0];
String tagAddress = tagSegments[1];
jobBuilder.tag(tagAlias, tagAddress);
topics.put(jobName, topic);
}
jobBuilder.build();
}
ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
try {
PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder().build();
TriggerCollector triggerCollector = new TriggerCollectorImpl(connectionManager);
scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
try {
Long timestamp = System.currentTimeMillis();
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("sourceName", sourceName);
sourcePartition.put("jobName", jobName);
Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
String topic = topics.get(jobName);
// Prepare the key structure.
Struct key = new Struct(KEY_SCHEMA)
.put(Constants.SOURCE_NAME_FIELD, sourceName)
.put(Constants.JOB_NAME_FIELD, jobName);
// Build the Schema for the result struct.
SchemaBuilder tagSchemaBuilder = SchemaBuilder.struct()
.name("org.apache.plc4x.kafka.schema.Tag");
for (Map.Entry<String, Object> result : results.entrySet()) {
// Get tag-name and -value from the results.
String tagName = result.getKey();
Object tagValue = result.getValue();
// Get the schema for the given value type.
Schema valueSchema = getSchema(tagValue);
// Add the schema description for the current tag.
tagSchemaBuilder.field(tagName, valueSchema);
}
Schema tagSchema = tagSchemaBuilder.build();
Schema recordSchema = SchemaBuilder.struct()
.name("org.apache.plc4x.kafka.schema.JobResult")
.doc("PLC Job result. This contains all of the received PLCValues as well as a received timestamp")
.field(Constants.TAGS_CONFIG, tagSchema)
.field(Constants.TIMESTAMP_CONFIG, Schema.INT64_SCHEMA)
.field(Constants.EXPIRES_CONFIG, Schema.OPTIONAL_INT64_SCHEMA)
.build();
// Build the struct itself.
Struct tagStruct = new Struct(tagSchema);
for (Map.Entry<String, Object> result : results.entrySet()) {
// Get tag-name and -value from the results.
String tagName = result.getKey();
Object tagValue = result.getValue();
if (tagSchema.field(tagName).schema().type() == Schema.Type.ARRAY) {
tagStruct.put(tagName, ((List) tagValue).stream().map(p -> ((PlcValue) p).getObject()).collect(Collectors.toList()));
} else {
tagStruct.put(tagName, tagValue);
}
}
Struct recordStruct = new Struct(recordSchema)
.put(Constants.TAGS_CONFIG, tagStruct)
.put(Constants.TIMESTAMP_CONFIG, timestamp);
// Prepare the source-record element.
SourceRecord sourceRecord = new SourceRecord(
sourcePartition, sourceOffset,
topic,
KEY_SCHEMA, key,
recordSchema, recordStruct
);
// Add the new source-record to the buffer.
buffer.add(sourceRecord);
} catch (Exception e) {
log.error("Error while parsing returned values", e);
}
}, triggerCollector);
scraper.start();
triggerCollector.start();
} catch (ScraperException e) {
log.error("Error starting the scraper", e);
}
}