in hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraStore.java [219:301]
private void mutate(CassandraSessionPool.Session session,
BackendAction item) {
CassandraBackendEntry entry = castBackendEntry(item.entry());
// Check if the entry has no change
if (!entry.selfChanged() && entry.subRows().isEmpty()) {
LOG.warn("The entry will be ignored due to no change: {}", entry);
}
CassandraTable table;
if (!entry.olap()) {
// Oltp table
table = this.table(entry.type());
} else {
if (entry.type().isIndex()) {
// Olap index
table = this.table(this.olapTableName(entry.type()));
} else {
// Olap vertex
table = this.table(this.olapTableName(entry.subId()));
}
}
switch (item.action()) {
case INSERT:
// Insert entry
if (entry.selfChanged()) {
table.insert(session, entry.row());
}
// Insert sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).insert(session, row);
}
break;
case DELETE:
// Delete entry
if (entry.selfChanged()) {
table.delete(session, entry.row());
}
// Delete sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).delete(session, row);
}
break;
case APPEND:
// Append entry
if (entry.selfChanged()) {
table.append(session, entry.row());
}
// Append sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).append(session, row);
}
break;
case ELIMINATE:
// Eliminate entry
if (entry.selfChanged()) {
table.eliminate(session, entry.row());
}
// Eliminate sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).eliminate(session, row);
}
break;
case UPDATE_IF_PRESENT:
if (entry.selfChanged()) {
// TODO: forward to master-writer node
table.updateIfPresent(session, entry.row());
}
assert entry.subRows().isEmpty() : entry.subRows();
break;
case UPDATE_IF_ABSENT:
if (entry.selfChanged()) {
// TODO: forward to master-writer node
table.updateIfAbsent(session, entry.row());
}
assert entry.subRows().isEmpty() : entry.subRows();
break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
}
}