in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java [202:303]
public FlussSource<OUT> build() {
checkNotNull(bootstrapServers, "BootstrapServers is required but not provided.");
checkNotNull(database, "Database is required but not provided.");
if (database.isEmpty()) {
throw new IllegalArgumentException("Database must not be empty.");
}
checkNotNull(tableName, "TableName is required but not provided.");
if (tableName.isEmpty()) {
throw new IllegalArgumentException("TableName must not be empty.");
}
checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");
// if null use the default value:
if (offsetsInitializer == null) {
offsetsInitializer = OffsetsInitializer.full();
}
// if null use the default value:
if (scanPartitionDiscoveryIntervalMs == null) {
scanPartitionDiscoveryIntervalMs =
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL
.defaultValue()
.toMillis();
}
if (this.flussConf == null) {
this.flussConf = new Configuration();
}
TablePath tablePath = new TablePath(this.database, this.tableName);
this.flussConf.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
TableInfo tableInfo;
try (Connection connection = ConnectionFactory.createConnection(flussConf);
Admin admin = connection.getAdmin()) {
try {
tableInfo = admin.getTableInfo(tablePath).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while getting table info", e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed to get table info", e);
}
} catch (Exception e) {
throw new RuntimeException(
"Failed to initialize FlussSource admin connection: " + e.getMessage(), e);
}
if (this.projectedFieldNames != null && this.projectedFieldNames.length > 0) {
RowType rowType = tableInfo.getRowType();
List<String> allFieldNames = rowType.getFieldNames();
// Create a map of field name to index
Map<String, Integer> fieldNameToIndex = new HashMap<>();
for (int i = 0; i < allFieldNames.size(); i++) {
fieldNameToIndex.put(allFieldNames.get(i), i);
}
int[] indices = new int[projectedFieldNames.length];
for (int i = 0; i < projectedFieldNames.length; i++) {
String fieldName = projectedFieldNames[i];
Integer index = fieldNameToIndex.get(fieldName);
if (index == null) {
throw new IllegalArgumentException(
"Field name '"
+ fieldName
+ "' not found in table schema. "
+ "Available fields: "
+ String.join(", ", allFieldNames));
}
indices[i] = index;
}
this.projectedFields = indices;
}
flussConf.addAll(tableInfo.getCustomProperties());
flussConf.addAll(tableInfo.getProperties());
boolean isPartitioned = !tableInfo.getPartitionKeys().isEmpty();
boolean hasPrimaryKey = !tableInfo.getPrimaryKeys().isEmpty();
RowType sourceOutputType =
projectedFields != null
? tableInfo.getRowType().project(projectedFields)
: tableInfo.getRowType();
LOG.info("Creating Fluss Source with Configuration: {}", flussConf);
return new FlussSource<>(
flussConf,
tablePath,
hasPrimaryKey,
isPartitioned,
sourceOutputType,
projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
true);
}