private void flush()

in pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java [109:157]


    private void flush() {
        final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
        final List<Record<byte[]>> recordsToFlush;

        synchronized (this) {
            if (incomingList.isEmpty()) {
                return;
            }
            recordsToFlush = incomingList;
            incomingList = Lists.newArrayList();
        }

        if (CollectionUtils.isNotEmpty(recordsToFlush)) {
            for (Record<byte[]> record: recordsToFlush) {
                try {
                    // records with null keys or values will be ignored
                    byte[] key = record.getKey().isPresent() ? record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
                    byte[] value = record.getValue();
                    recordsToSet.put(key, value);
                } catch (Exception e) {
                    record.fail();
                    recordsToFlush.remove(record);
                    log.warn("Record flush thread was exception ", e);
                }
            }
        }

        try {
            if (recordsToSet.size() > 0) {
                if (log.isDebugEnabled()) {
                    log.debug("Calling mset with {} values", recordsToSet.size());
                }

                RedisFuture<?> future = redisSession.asyncCommands().mset(recordsToSet);

                if (!future.await(operationTimeoutMs, TimeUnit.MILLISECONDS) || future.getError() != null) {
                    log.warn("Operation failed with error {} or timeout {} is exceeded", future.getError(), operationTimeoutMs);
                    recordsToFlush.forEach(tRecord -> tRecord.fail());
                    return;
                }
            }
            recordsToFlush.forEach(tRecord -> tRecord.ack());
            recordsToSet.clear();
            recordsToFlush.clear();
        } catch (InterruptedException e) {
            recordsToFlush.forEach(tRecord -> tRecord.fail());
            log.error("Redis mset data interrupted exception ", e);
        }
    }