in plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java [162:267]
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
PlcConnection connection = null;
try {
connection = connectionManager.getConnection(plc4xConnectionString);
} catch (PlcConnectionException e) {
log.warn("Failed to Open Connection {}", plc4xConnectionString);
remainingRetries--;
if (remainingRetries > 0) {
if (context != null) {
context.timeout(plc4xTimeout);
}
throw new RetriableException("Failed to Write to " + plc4xConnectionString + " retrying records that haven't expired");
}
log.warn("Failed to write after {} retries", plc4xRetries);
return;
}
PlcWriteRequest writeRequest;
final PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
int validCount = 0;
for (SinkRecord r: records) {
Struct record = (Struct) r.value();
String topic = r.topic();
Struct plcTags = record.getStruct(Constants.TAGS_CONFIG);
Schema plcTagsSchema = plcTags.schema();
for (Field plcTag : plcTagsSchema.fields()) {
String tagName = plcTag.name();
Object value = plcTags.get(tagName);
if (value != null) {
Long timestamp = record.getInt64("timestamp");
Long expiresOffset = record.getInt64("expires");
Long expires = 0L;
if (expiresOffset != null) {
expires = expiresOffset + timestamp;
}
//Discard records we are not or no longer interested in.
if (!topic.equals(plc4xTopic) || plc4xTopic.equals("")) {
log.debug("Ignoring write request received on wrong topic");
} else if (!tags.containsKey(tagName)) {
log.warn("Unable to find address for tag " + tagName);
} else if ((System.currentTimeMillis() > expires) && !(expires == 0)) {
log.warn("Write request has expired {} - {}, discarding {}", expires, System.currentTimeMillis(), tagName);
} else {
String address = tags.get(tagName);
try {
//If an array value is passed instead of a single value then convert to a String array
if (value instanceof String) {
String sValue = (String) value;
if ((sValue.charAt(0) == '[') && (sValue.charAt(sValue.length() - 1) == ']')) {
String[] values = sValue.substring(1,sValue.length() - 1).split(",");
builder.addTagAddress(address, address, values);
} else {
builder.addTagAddress(address, address, value);
}
} else {
builder.addTagAddress(address, address, value);
}
validCount += 1;
} catch (Exception e) {
//When building a request we want to discard the write-operation if there is an error.
log.warn("Invalid Address format for protocol {}", address);
}
}
}
}
}
if (validCount > 0) {
try {
writeRequest = builder.build();
writeRequest.execute().get();
log.debug("Wrote records to {}", plc4xConnectionString);
} catch (Exception e) {
remainingRetries--;
if (remainingRetries > 0) {
if (context != null) {
context.timeout(plc4xTimeout);
}
try {
connection.close();
} catch (Exception f) {
log.warn("Failed to Close {} on RetryableException", plc4xConnectionString);
}
throw new RetriableException("Failed to Write to " + plc4xConnectionString + " retrying records that haven't expired");
}
log.warn("Failed to write after {} retries", plc4xRetries);
}
}
try {
connection.close();
} catch (Exception e) {
log.warn("Failed to Close {}", plc4xConnectionString);
}
remainingRetries = plc4xRetries;
}