in source/topic_tree.c [331:523]
static void s_topic_tree_action_commit(struct topic_tree_action *action, struct aws_mqtt_topic_tree *tree) {
(void)tree;
AWS_PRECONDITION(action->node_to_update);
switch (action->mode) {
case AWS_MQTT_TOPIC_TREE_ADD:
case AWS_MQTT_TOPIC_TREE_UPDATE: {
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p action=%p: Committing %s topic tree action",
(void *)tree,
(void *)action,
(action->mode == AWS_MQTT_TOPIC_TREE_ADD) ? "add" : "update");
/* Destroy old userdata */
if (action->node_to_update->cleanup && action->node_to_update->userdata) {
/* If there was userdata assigned to this node, pass it out. */
action->node_to_update->cleanup(action->node_to_update->userdata);
}
/* Update data */
action->node_to_update->callback = action->callback;
action->node_to_update->cleanup = action->cleanup;
action->node_to_update->userdata = action->userdata;
action->node_to_update->qos = action->qos;
if (action->topic.ptr) {
action->node_to_update->topic = action->topic;
}
if (action->topic_filter) {
if (action->node_to_update->owns_topic_filter && action->node_to_update->topic_filter) {
/* The topic filer is already there, destory the new filter to keep all the byte cursor valid */
aws_string_destroy((void *)action->topic_filter);
} else {
action->node_to_update->topic_filter = action->topic_filter;
action->node_to_update->owns_topic_filter = true;
}
}
break;
}
case AWS_MQTT_TOPIC_TREE_REMOVE: {
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p action=%p: Committing remove topic tree action",
(void *)tree,
(void *)action);
struct aws_mqtt_topic_node *current = action->node_to_update;
const size_t sub_parts_len = aws_array_list_length(&action->to_remove) - 1;
if (current) {
/* If found the node, traverse up and remove each with no sub-topics.
* Then update all nodes that were using current's topic_filter for topic. */
/* "unsubscribe" current. */
if (current->cleanup && current->userdata) {
AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Cleaning up node's userdata", (void *)current);
/* If there was userdata assigned to this node, pass it out. */
current->cleanup(current->userdata);
}
current->callback = NULL;
current->cleanup = NULL;
current->userdata = NULL;
/* Set to true if current needs to be cleaned up. */
bool destroy_current = false;
/* How many nodes are left after the great purge. */
size_t nodes_left = sub_parts_len;
/* Remove all subscription-less and child-less nodes. */
for (size_t i = sub_parts_len; i > 0; --i) {
struct aws_mqtt_topic_node *node = NULL;
aws_array_list_get_at(&action->to_remove, &node, i);
AWS_ASSUME(node); /* Must be in bounds */
if (!s_topic_node_is_subscription(node) && 0 == aws_hash_table_get_entry_count(&node->subtopics)) {
/* No subscription and no children, this node needs to go. */
struct aws_mqtt_topic_node *grandma = NULL;
aws_array_list_get_at(&action->to_remove, &grandma, i - 1);
AWS_ASSUME(grandma); /* Must be in bounds */
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p node=%p: Removing child node %p with topic \"" PRInSTR "\"",
(void *)tree,
(void *)grandma,
(void *)node,
AWS_BYTE_CURSOR_PRI(node->topic));
aws_hash_table_remove(&grandma->subtopics, &node->topic, NULL, NULL);
/* Make sure the following loop doesn't hit this node. */
--nodes_left;
if (i != sub_parts_len) {
/* Clean up and delete */
s_topic_node_destroy(node, tree->allocator);
} else {
destroy_current = true;
}
} else {
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p: Node %p with topic \"" PRInSTR
"\" has children or is a subscription, leaving in place",
(void *)tree,
(void *)node,
AWS_BYTE_CURSOR_PRI(node->topic));
/* Once we've found one node with children, the rest are guaranteed to. */
break;
}
}
/* If current owns the full string, go fixup the pointer references. */
if (nodes_left > 0) {
/* If a new viable topic filter is found once, it can be used for all parents. */
const struct aws_string *new_topic_filter = NULL;
const struct aws_string *const old_topic_filter = current->topic_filter;
/* How much of new_topic_filter should be lopped off the beginning. */
struct aws_mqtt_topic_node *parent = NULL;
aws_array_list_get_at(&action->to_remove, &parent, nodes_left);
AWS_ASSUME(parent);
size_t topic_offset =
parent->topic.ptr - aws_string_bytes(parent->topic_filter) + parent->topic.len + 1;
/* -1 to avoid touching current */
for (size_t i = nodes_left; i > 0; --i) {
aws_array_list_get_at(&action->to_remove, &parent, i);
AWS_ASSUME(parent); /* Must be in bounds */
/* Remove this topic and following / from offset. */
topic_offset -= (parent->topic.len + 1);
if (parent->topic_filter == old_topic_filter) {
/* Uh oh, Mom's using my topic string again! Steal it and replace it with a new one, Indiana
* Jones style. */
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p: Found node %p reusing topic filter part, replacing with next child",
(void *)tree,
(void *)parent);
if (!new_topic_filter) {
/* Set new_tf to old_tf so it's easier to check against the existing node.
* Basically, it's an INOUT param. */
new_topic_filter = old_topic_filter;
/* Search all subtopics until we find one that isn't current. */
aws_hash_table_foreach(
&parent->subtopics, s_topic_node_string_finder, (void *)&new_topic_filter);
/* This would only happen if there is only one topic in subtopics (current's) and
* it has no children (in which case it should have been removed above). */
AWS_ASSERT(new_topic_filter != old_topic_filter);
/* Now that the new string has been found, the old one can be destroyed. */
aws_string_destroy((void *)current->topic_filter);
current->owns_topic_filter = false;
}
/* Update the pointers. */
parent->topic_filter = new_topic_filter;
parent->topic.ptr = (uint8_t *)aws_string_bytes(new_topic_filter) + topic_offset;
}
}
}
/* Now that the strings are update, remove current. */
if (destroy_current) {
s_topic_node_destroy(current, tree->allocator);
}
current = NULL;
}
break;
}
}
s_topic_tree_action_destroy(action);
}