public FlussSource build()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java [202:303]


    public FlussSource<OUT> build() {
        checkNotNull(bootstrapServers, "BootstrapServers is required but not provided.");
        checkNotNull(database, "Database is required but not provided.");
        if (database.isEmpty()) {
            throw new IllegalArgumentException("Database must not be empty.");
        }
        checkNotNull(tableName, "TableName is required but not provided.");
        if (tableName.isEmpty()) {
            throw new IllegalArgumentException("TableName must not be empty.");
        }
        checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");

        // if null use the default value:
        if (offsetsInitializer == null) {
            offsetsInitializer = OffsetsInitializer.full();
        }

        // if null use the default value:
        if (scanPartitionDiscoveryIntervalMs == null) {
            scanPartitionDiscoveryIntervalMs =
                    FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL
                            .defaultValue()
                            .toMillis();
        }

        if (this.flussConf == null) {
            this.flussConf = new Configuration();
        }

        TablePath tablePath = new TablePath(this.database, this.tableName);
        this.flussConf.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
        TableInfo tableInfo;
        try (Connection connection = ConnectionFactory.createConnection(flussConf);
                Admin admin = connection.getAdmin()) {
            try {
                tableInfo = admin.getTableInfo(tablePath).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while getting table info", e);
            } catch (ExecutionException e) {
                throw new RuntimeException("Failed to get table info", e);
            }
        } catch (Exception e) {
            throw new RuntimeException(
                    "Failed to initialize FlussSource admin connection: " + e.getMessage(), e);
        }

        if (this.projectedFieldNames != null && this.projectedFieldNames.length > 0) {
            RowType rowType = tableInfo.getRowType();
            List<String> allFieldNames = rowType.getFieldNames();

            // Create a map of field name to index
            Map<String, Integer> fieldNameToIndex = new HashMap<>();
            for (int i = 0; i < allFieldNames.size(); i++) {
                fieldNameToIndex.put(allFieldNames.get(i), i);
            }

            int[] indices = new int[projectedFieldNames.length];
            for (int i = 0; i < projectedFieldNames.length; i++) {
                String fieldName = projectedFieldNames[i];
                Integer index = fieldNameToIndex.get(fieldName);

                if (index == null) {
                    throw new IllegalArgumentException(
                            "Field name '"
                                    + fieldName
                                    + "' not found in table schema. "
                                    + "Available fields: "
                                    + String.join(", ", allFieldNames));
                }

                indices[i] = index;
            }

            this.projectedFields = indices;
        }

        flussConf.addAll(tableInfo.getCustomProperties());
        flussConf.addAll(tableInfo.getProperties());

        boolean isPartitioned = !tableInfo.getPartitionKeys().isEmpty();
        boolean hasPrimaryKey = !tableInfo.getPrimaryKeys().isEmpty();

        RowType sourceOutputType =
                projectedFields != null
                        ? tableInfo.getRowType().project(projectedFields)
                        : tableInfo.getRowType();

        LOG.info("Creating Fluss Source with Configuration: {}", flussConf);

        return new FlussSource<>(
                flussConf,
                tablePath,
                hasPrimaryKey,
                isPartitioned,
                sourceOutputType,
                projectedFields,
                offsetsInitializer,
                scanPartitionDiscoveryIntervalMs,
                deserializationSchema,
                true);
    }