in pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java [73:110]
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
jdbcSinkConfig = JdbcSinkConfig.load(config);
jdbcUrl = jdbcSinkConfig.getJdbcUrl();
if (jdbcSinkConfig.getJdbcUrl() == null) {
throw new IllegalArgumentException("Required jdbc Url not set.");
}
Properties properties = new Properties();
String username = jdbcSinkConfig.getUserName();
String password = jdbcSinkConfig.getPassword();
if (username != null) {
properties.setProperty("user", username);
}
if (password != null) {
properties.setProperty("password", password);
}
Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl()));
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
connection.setAutoCommit(false);
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
tableName = jdbcSinkConfig.getTableName();
tableId = JdbcUtils.getTableId(connection, tableName);
// Init PreparedStatement include insert, delete, update
initStatement();
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
incomingList = Lists.newArrayList();
swapList = Lists.newArrayList();
isFlushing = new AtomicBoolean(false);
flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}