public void put()

in plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java [162:267]


    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }

        PlcConnection connection = null;
        try {
            connection = connectionManager.getConnection(plc4xConnectionString);
        } catch (PlcConnectionException e) {
            log.warn("Failed to Open Connection {}", plc4xConnectionString);
            remainingRetries--;
            if (remainingRetries > 0) {
                if (context != null) {
                    context.timeout(plc4xTimeout);
                }
                throw new RetriableException("Failed to Write to " + plc4xConnectionString + " retrying records that haven't expired");
            }
            log.warn("Failed to write after {} retries", plc4xRetries);
            return;
        }

        PlcWriteRequest writeRequest;
        final PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
        int validCount = 0;
        for (SinkRecord r: records) {
            Struct record = (Struct) r.value();
            String topic = r.topic();

            Struct plcTags = record.getStruct(Constants.TAGS_CONFIG);
            Schema plcTagsSchema = plcTags.schema();

            for (Field plcTag : plcTagsSchema.fields()) {
                String tagName = plcTag.name();
                Object value = plcTags.get(tagName);
                if (value != null) {
                    Long timestamp = record.getInt64("timestamp");
                    Long expiresOffset = record.getInt64("expires");
                    Long expires = 0L;
                    if (expiresOffset != null) {
                        expires = expiresOffset + timestamp;
                    }

                    //Discard records we are not or no longer interested in.
                    if (!topic.equals(plc4xTopic) || plc4xTopic.equals("")) {
                        log.debug("Ignoring write request received on wrong topic");
                    } else if (!tags.containsKey(tagName)) {
                        log.warn("Unable to find address for tag " + tagName);
                    } else if ((System.currentTimeMillis() > expires) && !(expires == 0)) {
                        log.warn("Write request has expired {} - {}, discarding {}", expires, System.currentTimeMillis(), tagName);
                    } else {
                        String address = tags.get(tagName);
                        try {
                            //If an array value is passed instead of a single value then convert to a String array
                            if (value instanceof String) {
                                String sValue = (String) value;
                                if ((sValue.charAt(0) == '[') && (sValue.charAt(sValue.length() - 1) == ']')) {
                                    String[] values = sValue.substring(1,sValue.length() - 1).split(",");
                                    builder.addTagAddress(address, address, values);
                                } else {
                                    builder.addTagAddress(address, address, value);
                                }
                            } else {
                                builder.addTagAddress(address, address, value);
                            }

                            validCount += 1;
                        } catch (Exception e) {
                            //When building a request we want to discard the write-operation if there is an error.
                            log.warn("Invalid Address format for protocol {}", address);
                        }
                    }
                }

            }
        }

        if (validCount > 0) {
            try {
                writeRequest = builder.build();
                writeRequest.execute().get();
                log.debug("Wrote records to {}", plc4xConnectionString);
            } catch (Exception e) {
                remainingRetries--;
                if (remainingRetries > 0) {
                    if (context != null) {
                        context.timeout(plc4xTimeout);
                    }
                    try {
                        connection.close();
                    } catch (Exception f) {
                        log.warn("Failed to Close {} on RetryableException", plc4xConnectionString);
                    }
                    throw new RetriableException("Failed to Write to " + plc4xConnectionString + " retrying records that haven't expired");
                }
                log.warn("Failed to write after {} retries", plc4xRetries);
            }
        }

        try {
            connection.close();
        } catch (Exception e) {
            log.warn("Failed to Close {}", plc4xConnectionString);
        }

        remainingRetries = plc4xRetries;
    }