in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java [181:300]
public static Mutation makeMutation(TimeProvider timeProvider, CqlTable cqlTable, Row row, long timestamp)
{
final TableMetadata table = Schema.instance.getTableMetadata(cqlTable.keyspace(), cqlTable.table());
assert table != null;
final org.apache.cassandra.db.rows.Row.Builder rowBuilder = BTreeRow.sortedBuilder();
if (row.isInsert())
{
rowBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, timeProvider.nowInSeconds()));
}
org.apache.cassandra.db.rows.Row staticRow = Rows.EMPTY_STATIC_ROW;
// build partition key
final List<CqlField> partitionKeys = cqlTable.partitionKeys();
final ByteBuffer partitionKey = ByteBufferUtils.buildPartitionKey(partitionKeys,
partitionKeys.stream()
.map(f -> row.get(f.position()))
.toArray());
final DecoratedKey decoratedPartitionKey = table.partitioner.decorateKey(partitionKey);
// create a mutation and return early
if (isPartitionDeletion(cqlTable, row))
{
PartitionUpdate delete = PartitionUpdate.fullPartitionDelete(table, partitionKey, timestamp, timeProvider.nowInSeconds());
return new Mutation(delete);
}
final List<CqlField> clusteringKeys = cqlTable.clusteringKeys();
// create a mutation with rangetombstones
if (row.rangeTombstones() != null && !row.rangeTombstones().isEmpty())
{
return makeRangeTombstone(cqlTable, table, decoratedPartitionKey, timestamp, timeProvider, row);
}
// When the test row data (IRow) defines no regular row, noRegularRow is true. It happens when clustering keys are defined, but not set.
boolean noRegularRow = false;
// build clustering key
if (clusteringKeys.isEmpty())
{
rowBuilder.newRow(Clustering.EMPTY);
}
else if (clusteringKeys.stream().allMatch(f -> row.get(f.position()) == null))
{
// clustering key is defined, but not set ==> no regular row
noRegularRow = true;
}
else
{
rowBuilder.newRow(Clustering.make(
clusteringKeys.stream()
.map(f -> f.serialize(row.get(f.position())))
.toArray(ByteBuffer[]::new))
);
}
if (row.isDeleted())
{
rowBuilder.addRowDeletion(org.apache.cassandra.db.rows.Row.Deletion.regular(new DeletionTime(timestamp, timeProvider.nowInSeconds())));
}
else
{
BiConsumer<org.apache.cassandra.db.rows.Row.Builder, CqlField> rowBuildFunc = (builder, field) -> {
final CqlType type = (CqlType) field.type();
final ColumnMetadata cd = table.getColumn(new ColumnIdentifier(field.name(), false));
Object value = row.get(field.position());
if (value != UNSET_MARKER) // if unset, do not add the cell
{
if (value == null)
{
if (cd.isComplex())
{
type.addComplexTombstone(builder, cd, timestamp);
}
else
{
type.addTombstone(builder, cd, timestamp);
}
}
else if (value instanceof CollectionElement)
{
CollectionElement ce = (CollectionElement) value;
if (ce.value == null)
{
type.addTombstone(builder, cd, timestamp, ce.cellPath);
}
else
{
type.addCell(builder, cd, timestamp, row.ttl(), timeProvider.nowInSeconds(), ce.value, ce.cellPath);
}
}
else
{
type.addCell(builder, cd, timestamp, row.ttl(), timeProvider.nowInSeconds(), value);
}
}
};
if (!cqlTable.staticColumns().isEmpty())
{
org.apache.cassandra.db.rows.Row.Builder staticRowBuilder = BTreeRow.sortedBuilder();
staticRowBuilder.newRow(Clustering.STATIC_CLUSTERING);
for (final CqlField field : cqlTable.staticColumns())
{
rowBuildFunc.accept(staticRowBuilder, field);
}
staticRow = staticRowBuilder.build(); // replace the empty row with the new static row built
}
// build value cells
for (final CqlField field : cqlTable.valueColumns())
{
rowBuildFunc.accept(rowBuilder, field);
}
}
return new Mutation(PartitionUpdate.singleRowUpdate(table, decoratedPartitionKey,
noRegularRow ? null : rowBuilder.build(), // regular row
staticRow)); // static row
}