public DataStreamSink consumeDataStream()

in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java [84:102]


    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        if (!(dataStream.getType() instanceof RowTypeInfo)) {
            throw new TableException(
                    "No support for the type of the given DataStream: " + dataStream.getType());
        }

        CassandraRowSink sink =
                new CassandraRowSink(
                        dataStream.getType().getArity(),
                        cql,
                        builder,
                        CassandraSinkBaseConfig.newBuilder().build(),
                        new NoOpCassandraFailureHandler());

        return dataStream
                .addSink(sink)
                .setParallelism(dataStream.getParallelism())
                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
    }