int aws_mqtt_topic_tree_transaction_insert()

in source/topic_tree.c [572:680]


int aws_mqtt_topic_tree_transaction_insert(
    struct aws_mqtt_topic_tree *tree,
    struct aws_array_list *transaction,
    const struct aws_string *topic_filter_ori,
    enum aws_mqtt_qos qos,
    aws_mqtt_publish_received_fn *callback,
    aws_mqtt_userdata_cleanup_fn *cleanup,
    void *userdata) {

    AWS_PRECONDITION(tree);
    AWS_PRECONDITION(transaction);
    AWS_PRECONDITION(topic_filter_ori);
    AWS_PRECONDITION(callback);

    /* let topic tree take the ownership of the new string and leave the caller string alone. */
    struct aws_string *topic_filter = aws_string_new_from_string(tree->allocator, topic_filter_ori);

    AWS_LOGF_DEBUG(
        AWS_LS_MQTT_TOPIC_TREE,
        "tree=%p: Inserting topic filter %s into topic tree",
        (void *)tree,
        topic_filter->bytes);

    struct aws_mqtt_topic_node *current = tree->root;

    struct topic_tree_action *action = s_topic_tree_action_create(transaction);
    if (!action) {
        return AWS_OP_ERR;
    }

    /* Default to update unless a node was added */
    action->mode = AWS_MQTT_TOPIC_TREE_UPDATE;
    action->qos = qos;
    action->callback = callback;
    action->cleanup = cleanup;
    action->userdata = userdata;

    struct aws_byte_cursor topic_filter_cur = aws_byte_cursor_from_string(topic_filter);
    struct aws_byte_cursor sub_part;
    AWS_ZERO_STRUCT(sub_part);
    struct aws_byte_cursor last_part;
    AWS_ZERO_STRUCT(last_part);
    while (aws_byte_cursor_next_split(&topic_filter_cur, '/', &sub_part)) {

        last_part = sub_part;

        /* Add or find mid-node */
        struct aws_hash_element *elem = NULL;
        int was_created = 0;
        aws_hash_table_create(&current->subtopics, &sub_part, &elem, &was_created);

        if (was_created) {
            if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
                /* Store the last found node */
                action->last_found = current;
            }

            /* Node does not exist, add new one */
            current = s_topic_node_new(tree->allocator, &sub_part, topic_filter);
            if (!current) {
                /* Don't do handle_error logic, the action needs to persist to be rolled back */
                return AWS_OP_ERR;
            }

            /* Stash in the hash map */
            elem->key = &current->topic;
            elem->value = current;

            if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
                AWS_LOGF_TRACE(
                    AWS_LS_MQTT_TOPIC_TREE,
                    "tree=%p: Topic part \"" PRInSTR "\" is new, it and all children will be added as new nodes",
                    (void *)tree,
                    AWS_BYTE_CURSOR_PRI(sub_part));

                /* Store the node we just made, and make sure we don't store again */
                action->mode = AWS_MQTT_TOPIC_TREE_ADD;
                action->first_created = current;
            }
        } else {
            AWS_ASSERT(action->mode == AWS_MQTT_TOPIC_TREE_UPDATE); /* Can't have found an existing node while adding */

            /* If the node exists, just traverse it */
            current = elem->value;
        }
    }

    action->node_to_update = current;

    /* Node found (or created), add the topic filter and callbacks */
    if (current->owns_topic_filter) {

        AWS_LOGF_TRACE(
            AWS_LS_MQTT_TOPIC_TREE,
            "tree=%p node=%p: Updating existing node that already owns its topic_filter, throwing out parameter",
            (void *)tree,
            (void *)current);

        /* If the topic filter was already here, this is already a subscription.
           Free the new topic_filter so all existing byte_cursors remain valid. */
        aws_string_destroy(topic_filter);
    } else {
        /* Node already existed (or was created) but wasn't subscription. */
        action->topic = last_part;
        action->topic_filter = topic_filter;
    }

    return AWS_OP_SUCCESS;
}