in source/topic_tree.c [572:680]
int aws_mqtt_topic_tree_transaction_insert(
struct aws_mqtt_topic_tree *tree,
struct aws_array_list *transaction,
const struct aws_string *topic_filter_ori,
enum aws_mqtt_qos qos,
aws_mqtt_publish_received_fn *callback,
aws_mqtt_userdata_cleanup_fn *cleanup,
void *userdata) {
AWS_PRECONDITION(tree);
AWS_PRECONDITION(transaction);
AWS_PRECONDITION(topic_filter_ori);
AWS_PRECONDITION(callback);
/* let topic tree take the ownership of the new string and leave the caller string alone. */
struct aws_string *topic_filter = aws_string_new_from_string(tree->allocator, topic_filter_ori);
AWS_LOGF_DEBUG(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p: Inserting topic filter %s into topic tree",
(void *)tree,
topic_filter->bytes);
struct aws_mqtt_topic_node *current = tree->root;
struct topic_tree_action *action = s_topic_tree_action_create(transaction);
if (!action) {
return AWS_OP_ERR;
}
/* Default to update unless a node was added */
action->mode = AWS_MQTT_TOPIC_TREE_UPDATE;
action->qos = qos;
action->callback = callback;
action->cleanup = cleanup;
action->userdata = userdata;
struct aws_byte_cursor topic_filter_cur = aws_byte_cursor_from_string(topic_filter);
struct aws_byte_cursor sub_part;
AWS_ZERO_STRUCT(sub_part);
struct aws_byte_cursor last_part;
AWS_ZERO_STRUCT(last_part);
while (aws_byte_cursor_next_split(&topic_filter_cur, '/', &sub_part)) {
last_part = sub_part;
/* Add or find mid-node */
struct aws_hash_element *elem = NULL;
int was_created = 0;
aws_hash_table_create(¤t->subtopics, &sub_part, &elem, &was_created);
if (was_created) {
if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
/* Store the last found node */
action->last_found = current;
}
/* Node does not exist, add new one */
current = s_topic_node_new(tree->allocator, &sub_part, topic_filter);
if (!current) {
/* Don't do handle_error logic, the action needs to persist to be rolled back */
return AWS_OP_ERR;
}
/* Stash in the hash map */
elem->key = ¤t->topic;
elem->value = current;
if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p: Topic part \"" PRInSTR "\" is new, it and all children will be added as new nodes",
(void *)tree,
AWS_BYTE_CURSOR_PRI(sub_part));
/* Store the node we just made, and make sure we don't store again */
action->mode = AWS_MQTT_TOPIC_TREE_ADD;
action->first_created = current;
}
} else {
AWS_ASSERT(action->mode == AWS_MQTT_TOPIC_TREE_UPDATE); /* Can't have found an existing node while adding */
/* If the node exists, just traverse it */
current = elem->value;
}
}
action->node_to_update = current;
/* Node found (or created), add the topic filter and callbacks */
if (current->owns_topic_filter) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_TOPIC_TREE,
"tree=%p node=%p: Updating existing node that already owns its topic_filter, throwing out parameter",
(void *)tree,
(void *)current);
/* If the topic filter was already here, this is already a subscription.
Free the new topic_filter so all existing byte_cursors remain valid. */
aws_string_destroy(topic_filter);
} else {
/* Node already existed (or was created) but wasn't subscription. */
action->topic = last_part;
action->topic_filter = topic_filter;
}
return AWS_OP_SUCCESS;
}