public abstract void bindValue()

in pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java [159:233]


    public abstract void bindValue(
        PreparedStatement statement,
        Record<T> message, String action) throws Exception;

    private void flush() {
        // if not in flushing state, do flush, else return;
        if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Starting flush, queue size: {}", incomingList.size());
            }
            if (!swapList.isEmpty()) {
                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
            }
            synchronized (this) {
                List<Record<T>> tmpList;
                swapList.clear();

                tmpList = swapList;
                swapList = incomingList;
                incomingList = tmpList;
            }

            int count = 0;
            try {
                // bind each record value
                for (Record<T> record : swapList) {
                    String action = record.getProperties().get(ACTION);
                    if (action == null) {
                        action = INSERT;
                    }
                    switch (action) {
                        case DELETE:
                            bindValue(deleteStatement, record, action);
                            count += 1;
                            deleteStatement.execute();
                            break;
                        case UPDATE:
                            bindValue(updateStatement, record, action);
                            count += 1;
                            updateStatement.execute();
                            break;
                        case INSERT:
                            bindValue(insertStatement, record, action);
                            count += 1;
                            insertStatement.execute();
                            break;
                        default:
                            String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
                                                       action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
                            throw new IllegalArgumentException(msg);
                    }
                }
                connection.commit();
                swapList.forEach(Record::ack);
            } catch (Exception e) {
                log.error("Got exception ", e);
                swapList.forEach(Record::fail);
            }

            if (swapList.size() != count) {
                log.error("Update count {}  not match total number of records {}", count, swapList.size());
            }

            // finish flush
            if (log.isDebugEnabled()) {
                log.debug("Finish flush, queue size: {}", swapList.size());
            }
            swapList.clear();
            isFlushing.set(false);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
            }
        }
    }