in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java [84:102]
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
if (!(dataStream.getType() instanceof RowTypeInfo)) {
throw new TableException(
"No support for the type of the given DataStream: " + dataStream.getType());
}
CassandraRowSink sink =
new CassandraRowSink(
dataStream.getType().getArity(),
cql,
builder,
CassandraSinkBaseConfig.newBuilder().build(),
new NoOpCassandraFailureHandler());
return dataStream
.addSink(sink)
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
}