in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java [84:159]
public void event(Type type, ChildData oldData, ChildData newData) {
if (newData != null) {
LOG.debug("Received {} event (path: {})", type, newData.getPath());
} else {
LOG.debug("Received {} event", type);
}
switch (type) {
case NODE_CREATED:
{
if (newData != null) {
// maybe it's for create a partition node
// try to parse the path as a table partition node
PhysicalTablePath physicalTablePath =
PartitionZNode.parsePath(newData.getPath());
if (physicalTablePath != null) {
assert physicalTablePath.getPartitionName() != null;
processCreatePartition(
physicalTablePath.getTablePath(),
physicalTablePath.getPartitionName(),
newData);
}
}
break;
}
case NODE_CHANGED:
{
// we will first create the path for the table in zk when create schema for
// the table, then put the real table info to the path. so, it'll be a node
// changed event
if (newData != null) {
TablePath tablePath = TableZNode.parsePath(newData.getPath());
if (tablePath == null) {
break;
}
processCreateTable(tablePath, newData);
}
break;
}
case NODE_DELETED:
{
// maybe it's for deletion of a partition
// try to parse the path as a table partition node
PhysicalTablePath physicalTablePath =
PartitionZNode.parsePath(oldData.getPath());
if (physicalTablePath != null) {
// it's for deletion of a table partition node
TablePartition partition = PartitionZNode.decode(oldData.getData());
eventManager.put(
new DropPartitionEvent(
partition.getTableId(),
partition.getPartitionId(),
physicalTablePath.getPartitionName()));
} else {
// maybe table node is deleted
// try to parse the path as a table node
TablePath tablePath = TableZNode.parsePath(oldData.getPath());
if (tablePath == null) {
break;
}
TableRegistration table = TableZNode.decode(oldData.getData());
TableConfig tableConfig =
new TableConfig(Configuration.fromMap(table.properties));
eventManager.put(
new DropTableEvent(
table.tableId,
tableConfig
.getAutoPartitionStrategy()
.isAutoPartitionEnabled(),
tableConfig.isDataLakeEnabled()));
}
break;
}
default:
break;
}
}