in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java [159:194]
private static String getInsertStatement(StructType dfSchema, TableInfoProvider tableInfo,
TTLOption ttlOption, TimestampOption timestampOption)
{
List<String> columnNames = Arrays.stream(dfSchema.fieldNames())
.filter(fieldName -> !fieldName.equals(ttlOption.columnName()))
.filter(fieldName -> !fieldName.equals(timestampOption.columnName()))
.collect(Collectors.toList());
StringBuilder stringBuilder = new StringBuilder("INSERT INTO ")
.append(tableInfo.getKeyspaceName())
.append(".").append(tableInfo.getName())
.append(columnNames.stream().collect(Collectors.joining(",", " (", ") ")))
.append("VALUES")
.append(columnNames.stream().map(columnName -> ":" + columnName).collect(Collectors.joining(",", " (", ")")));
if (ttlOption.withTTl() && timestampOption.withTimestamp())
{
stringBuilder.append(" USING TIMESTAMP ")
.append(timestampOption)
.append(" AND TTL ")
.append(ttlOption);
}
else if (timestampOption.withTimestamp())
{
stringBuilder.append(" USING TIMESTAMP ")
.append(timestampOption);
}
else if (ttlOption.withTTl())
{
stringBuilder.append(" USING TTL ")
.append(ttlOption);
}
stringBuilder.append(";");
String insertStatement = stringBuilder.toString();
LOGGER.info("CQL insert statement for the RDD {}", insertStatement);
return insertStatement;
}