in cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java [234:269]
private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records, Integer parallelism,
boolean addTTLColumn, boolean addTimestampColumn)
{
long recordsPerPartition = records / parallelism;
long remainder = records - (recordsPerPartition * parallelism);
List<Integer> seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList());
int ttl = 120; // data will not be queryable in two minutes
long timeStamp = System.currentTimeMillis() * 1000;
JavaRDD<Row> dataset = sc.parallelize(seq, parallelism).mapPartitionsWithIndex(
(Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, integerIterator) -> {
long firstRecordNumber = index * recordsPerPartition;
long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition;
java.util.Iterator<Row> rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> {
long recordNumber = firstRecordNumber + offset;
String courseNameString = String.valueOf(recordNumber);
Integer courseNameStringLen = courseNameString.length();
Integer courseNameMultiplier = 1000 / courseNameStringLen;
byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier);
if (addTTLColumn && addTimestampColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp);
}
if (addTTLColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, ttl);
}
if (addTimestampColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp);
}
return RowFactory.create(recordNumber, courseName, recordNumber);
}).iterator();
return rows;
}, false);
return dataset;
}