in src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java [62:95]
public DorisWriter(
String tableName,
String topic,
int partition,
DorisOptions dorisOptions,
ConnectionProvider connectionProvider,
DorisConnectMonitor connectMonitor) {
this.topic = topic;
this.partition = partition;
this.tableName = tableName;
if (StringUtils.isNotEmpty(dorisOptions.getDatabase())) {
this.dbName = dorisOptions.getDatabase();
} else if (tableName.contains(".")) {
String[] dbTbl = tableName.split("\\.");
this.dbName = dbTbl[0];
this.tableName = dbTbl[1];
} else {
LOG.error("Error params database {}, table {}, topic {}", dbName, tableName, topic);
throw new ArgumentsException("Failed to get database and table names");
}
this.tableIdentifier = dbName + "." + this.tableName;
this.fileNames = new ArrayList<>();
this.buffer = new RecordBuffer();
this.processedOffset = new AtomicLong(-1);
this.flushedOffset = new AtomicLong(-1);
this.committedOffset = new AtomicLong(0);
this.previousFlushTimeStamp = System.currentTimeMillis();
this.dorisOptions = dorisOptions;
this.connectionProvider = connectionProvider;
this.recordService = new RecordService(dorisOptions);
this.connectMonitor = connectMonitor;
}