in flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java [35:54]
public CassandraSourceReader<OUT> create(
SourceReaderContext context,
ClusterBuilder clusterBuilder,
Class<OUT> pojoClass,
String query,
String keyspace,
String table,
MapperOptions mapperOptions) {
Cluster cluster = clusterBuilder.getCluster();
Session session = cluster.connect();
Mapper<OUT> mapper = new MappingManager(session).mapper(pojoClass);
if (mapperOptions != null) {
Mapper.Option[] optionsArray = mapperOptions.getMapperOptions();
if (optionsArray != null) {
mapper.setDefaultGetOptions(optionsArray);
}
}
return new CassandraSourceReader<>(
context, query, keyspace, table, cluster, session, mapper);
}