void qdra_config_address_create_CT()

in src/router_core/agent_config_address.c [336:528]


void qdra_config_address_create_CT(qdr_core_t         *core,
                                   qd_iterator_t      *name,
                                   qdr_query_t        *query,
                                   qd_parsed_field_t  *in_body)
{
    char *pattern = NULL;

    while (true) {
        //
        // Ensure there isn't a duplicate name
        //
        qdr_address_config_t *addr = 0;
        if (name) {
            qd_iterator_view_t iter_view = qd_iterator_get_view(name);
            qd_iterator_annotate_prefix(name, CONFIG_ADDRESS_PREFIX);
            qd_iterator_reset_view(name, ITER_VIEW_ADDRESS_HASH);
            qd_hash_retrieve(core->addr_lr_al_hash, name, (void**) &addr);
            qd_iterator_reset_view(name, iter_view);
        }

        if (!!addr) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = "Name conflicts with an existing entity";
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        // Ensure that the body is a map
        if (!qd_parse_is_map(in_body)) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = "Body of request must be a map";
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        //
        // Extract the fields from the request
        //
        qd_parsed_field_t *prefix_field    = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PREFIX]);
        qd_parsed_field_t *pattern_field   = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PATTERN]);
        qd_parsed_field_t *distrib_field   = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_DISTRIBUTION]);
        qd_parsed_field_t *waypoint_field  = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]);
        qd_parsed_field_t *in_phase_field  = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]);
        qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]);
        qd_parsed_field_t *priority_field  = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PRIORITY]);
        qd_parsed_field_t *fallback_field  = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_FALLBACK]);

        bool waypoint  = waypoint_field  ? qd_parse_as_bool(waypoint_field)  : false;
        long in_phase  = in_phase_field  ? qd_parse_as_long(in_phase_field)  : -1;
        long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : -1;
        long priority  = priority_field  ? qd_parse_as_long(priority_field)  : -1;
        bool fallback  = fallback_field  ? qd_parse_as_bool(fallback_field)  : false;

        //
        // Either a prefix or a pattern field is mandatory.  Prefix and pattern
        // are mutually exclusive. Fail if either both or none are given.
        //
        const char *msg = NULL;
        if (!prefix_field && !pattern_field) {
            msg = "Either a 'prefix' or 'pattern' attribute must be provided";
        } else if (prefix_field && pattern_field) {
            msg = "Cannot specify both a 'prefix' and a 'pattern' attribute";
        }

        if (fallback && (waypoint || in_phase > 0 || out_phase > 0)) {
            msg = "Fallback cannot be specified with waypoint or non-zero ingress and egress phases";
        }

        if (msg) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = msg;
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        // validate the pattern/prefix, add "/#" if prefix
        pattern = qdra_config_address_validate_pattern_CT((prefix_field) ? prefix_field : pattern_field,
                                                          !!prefix_field,
                                                          &msg);
        if (!pattern) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = msg;
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        //
        // Handle the address-phasing logic.  If the phases are provided, use them.  Otherwise
        // use the waypoint flag to set the most common defaults.
        //
        if (in_phase == -1 && out_phase == -1) {
            in_phase  = 0;
            out_phase = waypoint ? 1 : 0;
        }

        //
        // Validate the phase values
        //
        if (in_phase < 0 || in_phase > 9 || out_phase < 0 || out_phase > 9) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = "Phase values must be between 0 and 9";
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        //
        // Validate the priority values.
        //
        if (priority > QDR_MAX_PRIORITY ) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = "Priority value, if present, must be between 0 and QDR_MAX_PRIORITY";
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        //
        // The request is valid.  Attempt to insert the address pattern into
        // the parse tree, fail if there is already an entry for that pattern
        //
        addr = new_qdr_address_config_t();
        if (!addr) {
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = "Out of memory";
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }
        ZERO(addr);

        //
        // Insert the uninitialized address to check if it already exists in
        // the parse tree.  On success initialize it.  This is thread safe
        // since the current thread (core) is the only thread allowed to use
        // the parse tree
        //

        qd_error_t rc = qd_parse_tree_add_pattern_str(core->addr_parse_tree, pattern, addr);
        if (rc) {
            free_qdr_address_config_t(addr);
            query->status = QD_AMQP_BAD_REQUEST;
            query->status.description = qd_error_name(rc);
            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
            break;
        }

        addr->ref_count = 1; // Represents the reference from the addr_config list
        addr->name      = name ? (char*) qd_iterator_copy(name) : 0;
        addr->identity  = qdr_identifier(core);
        addr->treatment = qdra_address_treatment_CT(distrib_field);
        addr->in_phase  = in_phase;
        addr->out_phase = out_phase;
        addr->is_prefix = !!prefix_field;
        addr->pattern   = pattern;
        addr->priority  = priority;
        addr->fallback  = fallback;
        pattern = 0;

        DEQ_INSERT_TAIL(core->addr_config, addr);
        if (name) {
            qd_iterator_view_t iter_view = qd_iterator_get_view(name);
            qd_iterator_reset_view(name, ITER_VIEW_ADDRESS_HASH);
            qd_hash_insert(core->addr_lr_al_hash, name, addr, &addr->hash_handle);
            qd_iterator_reset_view(name, iter_view);
        }
        //
        // Compose the result map for the response.
        //
        if (query->body) {
            qd_compose_start_map(query->body);
            for (int col = 0; col < QDR_CONFIG_ADDRESS_COLUMN_COUNT; col++)
                qdr_config_address_insert_column_CT(addr, col, query->body, true);
            qd_compose_end_map(query->body);
        }

        query->status = QD_AMQP_CREATED;
        break;
    }

    //
    // Enqueue the response if there is a body. If there is no body, this is a management
    // operation created internally by the configuration file parser.
    //
    if (query->body) {
        //
        // If there was an error in processing the create, insert a NULL value into the body.
        //
        if (query->status.status / 100 > 2)
            qd_compose_insert_null(query->body);
        qdr_agent_enqueue_response_CT(core, query);
    } else
        qdr_query_free(query);

    free(pattern);
}