in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java [70:130]
public JdbcWriter(
JdbcConnectionProvider connectionProvider,
JdbcExecutionOptions executionOptions,
JdbcExactlyOnceOptions exactlyOnceOptions,
JdbcQueryStatement<IN> queryStatement,
JdbcOutputSerializer<IN> outputSerializer,
DeliveryGuarantee deliveryGuarantee,
Collection<JdbcWriterState> recoveredState,
InitContext initContext)
throws IOException {
this.deliveryGuarantee =
checkNotNull(deliveryGuarantee, "deliveryGuarantee must be defined");
checkNotNull(initContext, "initContext must be defined");
pendingRecords = false;
this.lastCheckpointId =
initContext.getRestoredCheckpointId().orElse(InitContext.INITIAL_CHECKPOINT_ID - 1);
checkNotNull(connectionProvider, "connectionProvider must be defined");
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
checkArgument(
executionOptions.getMaxRetries() == 0,
"JDBC XA sink requires maxRetries equal to 0, otherwise it could "
+ "cause duplicates. See issue FLINK-22311 for details.");
checkNotNull(exactlyOnceOptions, "exactlyOnceOptions must be defined");
checkNotNull(recoveredState, "recoveredState must be defined");
checkState(recoveredState.size() <= 1, "more than one state to recover");
JdbcWriterState state =
recoveredState.stream().findFirst().orElse(JdbcWriterState.empty());
TransactionId transactionId =
TransactionId.create(
initContext.getJobInfo().getJobId().getBytes(),
initContext.getTaskInfo().getIndexOfThisSubtask(),
initContext.getTaskInfo().getNumberOfParallelSubtasks());
this.jdbcTransaction =
new XaTransaction(
exactlyOnceOptions,
transactionId,
((XaConnectionProvider) connectionProvider));
this.jdbcTransaction.open(state);
this.jdbcTransaction.createTx(lastCheckpointId);
}
checkNotNull(executionOptions, "executionOptions must be defined");
checkNotNull(queryStatement, "queryStatement must be defined");
this.jdbcOutput =
new JdbcOutputFormat<>(
connectionProvider,
executionOptions,
() ->
JdbcBatchStatementExecutor.simple(
queryStatement.query(), queryStatement::statement));
this.jdbcOutput.open(outputSerializer);
}