public JdbcWriter()

in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java [70:130]


    public JdbcWriter(
            JdbcConnectionProvider connectionProvider,
            JdbcExecutionOptions executionOptions,
            JdbcExactlyOnceOptions exactlyOnceOptions,
            JdbcQueryStatement<IN> queryStatement,
            JdbcOutputSerializer<IN> outputSerializer,
            DeliveryGuarantee deliveryGuarantee,
            Collection<JdbcWriterState> recoveredState,
            InitContext initContext)
            throws IOException {

        this.deliveryGuarantee =
                checkNotNull(deliveryGuarantee, "deliveryGuarantee must be defined");

        checkNotNull(initContext, "initContext must be defined");

        pendingRecords = false;
        this.lastCheckpointId =
                initContext.getRestoredCheckpointId().orElse(InitContext.INITIAL_CHECKPOINT_ID - 1);

        checkNotNull(connectionProvider, "connectionProvider must be defined");

        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            checkArgument(
                    executionOptions.getMaxRetries() == 0,
                    "JDBC XA sink requires maxRetries equal to 0, otherwise it could "
                            + "cause duplicates. See issue FLINK-22311 for details.");

            checkNotNull(exactlyOnceOptions, "exactlyOnceOptions must be defined");
            checkNotNull(recoveredState, "recoveredState must be defined");
            checkState(recoveredState.size() <= 1, "more than one state to recover");

            JdbcWriterState state =
                    recoveredState.stream().findFirst().orElse(JdbcWriterState.empty());

            TransactionId transactionId =
                    TransactionId.create(
                            initContext.getJobInfo().getJobId().getBytes(),
                            initContext.getTaskInfo().getIndexOfThisSubtask(),
                            initContext.getTaskInfo().getNumberOfParallelSubtasks());

            this.jdbcTransaction =
                    new XaTransaction(
                            exactlyOnceOptions,
                            transactionId,
                            ((XaConnectionProvider) connectionProvider));
            this.jdbcTransaction.open(state);
            this.jdbcTransaction.createTx(lastCheckpointId);
        }

        checkNotNull(executionOptions, "executionOptions must be defined");
        checkNotNull(queryStatement, "queryStatement must be defined");
        this.jdbcOutput =
                new JdbcOutputFormat<>(
                        connectionProvider,
                        executionOptions,
                        () ->
                                JdbcBatchStatementExecutor.simple(
                                        queryStatement.query(), queryStatement::statement));
        this.jdbcOutput.open(outputSerializer);
    }