void qdra_config_exchange_create_CT()

in src/router_core/exchange_bindings.c [412:532]


void qdra_config_exchange_create_CT(qdr_core_t         *core,
                                    qd_iterator_t      *name,
                                    qdr_query_t        *query,
                                    qd_parsed_field_t  *in_body)
{
    qdr_exchange_t *ex = NULL;

    query->status = QD_AMQP_BAD_REQUEST;

    if (!qd_parse_is_map(in_body)) {
        query->status.description = "Body of request must be a map";
        goto exit;
    }

    if (!name) {
        query->status.description = "exchange requires a unique name";
        goto exit;
    }

    qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body,
                                                             qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ADDRESS]);
    if (!address_field) {
        query->status.description = "exchange address is mandatory";
        goto exit;
    }
    qd_iterator_t *address = qd_parse_raw(address_field);

    // check for duplicates
    {
        qdr_exchange_t *eptr = 0;
        for (eptr = DEQ_HEAD(core->exchanges); eptr; eptr = DEQ_NEXT(eptr)) {
            if (qd_iterator_equal(address, eptr->address)) {
                query->status.description = "duplicate exchange address";
                goto exit;
            } else if (qd_iterator_equal(name, eptr->name)) {
                query->status.description = "duplicate exchange name";
                goto exit;
            }
        }
    }

    qd_parsed_field_t *method_field = qd_parse_value_by_key(in_body,
                                                            qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_MATCH_METHOD]);
    qd_parse_tree_type_t method = QD_PARSE_TREE_AMQP_0_10;
    if (method_field) {
        if (qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"mqtt")) {
            method = QD_PARSE_TREE_MQTT;
        } else if (!qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"amqp")) {
            query->status.description = "Exchange matchMethod must be either 'amqp' or 'mqtt'";
            goto exit;
        }
    }

    long phase = 0;
    qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
                                                           qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_PHASE]);
    if (phase_field) {
        phase = qd_parse_as_long(phase_field);
        if (phase < 0 || phase > 9) {
            query->status.description = "phase must be in the range 0-9";
            goto exit;
        }
    }

    qd_iterator_t *alternate = NULL;
    long alt_phase = 0;
    qd_parsed_field_t *alternate_field = qd_parse_value_by_key(in_body,
                                                               qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALTERNATE]);
    if (alternate_field) {
        alternate = qd_parse_raw(alternate_field);
        qd_parsed_field_t *alt_phase_field = qd_parse_value_by_key(in_body,
                                                                   qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALT_PHASE]);
        if (alt_phase_field) {
            alt_phase = qd_parse_as_long(alt_phase_field);
            if (alt_phase < 0 || alt_phase > 9) {
                query->status.description = "phase must be in the range 0-9";
                goto exit;
            }
        }
    }

    ex = qdr_exchange(core, name, address, phase, alternate, alt_phase, method);
    if (ex) {
        // @TODO(kgiusti) - for now, until the behavior is nailed down:
        static int warn_user;
        if (!warn_user) {
            warn_user = 1;
            qd_log(core->agent_log, QD_LOG_WARNING,
                   "The Exchange/Binding feature is currently EXPERIMENTAL."
                   " Its functionality may change in future releases"
                   " of the Qpid Dispatch Router. Backward compatibility is"
                   " not guaranteed.");
        }
        query->status = QD_AMQP_CREATED;
        if (query->body) {
            write_config_exchange_map(ex, query->body);
        }
    } else {
        query->status.description = "failed to allocate exchange";
    }

 exit:

    if (query->status.status == QD_AMQP_CREATED.status) {
        qd_log(core->agent_log, QD_LOG_DEBUG,
               "Exchange %s CREATED (id=%"PRIu64")", ex->name, ex->identity);

    } else {
        qd_log(core->agent_log, QD_LOG_ERROR,
               "Error performing CREATE of %s: %s", config_exchange_entity_type, query->status.description);
        // return a NULL body:
        if (query->body) qd_compose_insert_null(query->body);
    }

    if (query->body) {
        qdr_agent_enqueue_response_CT(core, query);
    } else {
        // no body == create from internal config parser
        qdr_query_free(query);
    }
}