public ScanRuntimeProvider getScanRuntimeProvider()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java [136:161]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        final RowType rowType = (RowType) producedDataType.getLogicalType();
        final TypeInformation<RowData> typeInfo =
                runtimeProviderContext.createTypeInformation(producedDataType);

        final MongoDeserializationSchema<RowData> deserializationSchema =
                new MongoRowDataDeserializationSchema(rowType, typeInfo);

        MongoSource<RowData> mongoSource =
                MongoSource.<RowData>builder()
                        .setUri(connectionOptions.getUri())
                        .setDatabase(connectionOptions.getDatabase())
                        .setCollection(connectionOptions.getCollection())
                        .setFetchSize(readOptions.getFetchSize())
                        .setNoCursorTimeout(readOptions.isNoCursorTimeout())
                        .setPartitionStrategy(readOptions.getPartitionStrategy())
                        .setPartitionSize(readOptions.getPartitionSize())
                        .setSamplesPerPartition(readOptions.getSamplesPerPartition())
                        .setLimit(limit)
                        .setProjectedFields(DataType.getFieldNames(producedDataType))
                        .setFilter(filter)
                        .setDeserializationSchema(deserializationSchema)
                        .build();

        return SourceProvider.of(mongoSource);
    }