in src/router_core/exchange_bindings.c [412:532]
void qdra_config_exchange_create_CT(qdr_core_t *core,
qd_iterator_t *name,
qdr_query_t *query,
qd_parsed_field_t *in_body)
{
qdr_exchange_t *ex = NULL;
query->status = QD_AMQP_BAD_REQUEST;
if (!qd_parse_is_map(in_body)) {
query->status.description = "Body of request must be a map";
goto exit;
}
if (!name) {
query->status.description = "exchange requires a unique name";
goto exit;
}
qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ADDRESS]);
if (!address_field) {
query->status.description = "exchange address is mandatory";
goto exit;
}
qd_iterator_t *address = qd_parse_raw(address_field);
// check for duplicates
{
qdr_exchange_t *eptr = 0;
for (eptr = DEQ_HEAD(core->exchanges); eptr; eptr = DEQ_NEXT(eptr)) {
if (qd_iterator_equal(address, eptr->address)) {
query->status.description = "duplicate exchange address";
goto exit;
} else if (qd_iterator_equal(name, eptr->name)) {
query->status.description = "duplicate exchange name";
goto exit;
}
}
}
qd_parsed_field_t *method_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_MATCH_METHOD]);
qd_parse_tree_type_t method = QD_PARSE_TREE_AMQP_0_10;
if (method_field) {
if (qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"mqtt")) {
method = QD_PARSE_TREE_MQTT;
} else if (!qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"amqp")) {
query->status.description = "Exchange matchMethod must be either 'amqp' or 'mqtt'";
goto exit;
}
}
long phase = 0;
qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_PHASE]);
if (phase_field) {
phase = qd_parse_as_long(phase_field);
if (phase < 0 || phase > 9) {
query->status.description = "phase must be in the range 0-9";
goto exit;
}
}
qd_iterator_t *alternate = NULL;
long alt_phase = 0;
qd_parsed_field_t *alternate_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALTERNATE]);
if (alternate_field) {
alternate = qd_parse_raw(alternate_field);
qd_parsed_field_t *alt_phase_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALT_PHASE]);
if (alt_phase_field) {
alt_phase = qd_parse_as_long(alt_phase_field);
if (alt_phase < 0 || alt_phase > 9) {
query->status.description = "phase must be in the range 0-9";
goto exit;
}
}
}
ex = qdr_exchange(core, name, address, phase, alternate, alt_phase, method);
if (ex) {
// @TODO(kgiusti) - for now, until the behavior is nailed down:
static int warn_user;
if (!warn_user) {
warn_user = 1;
qd_log(core->agent_log, QD_LOG_WARNING,
"The Exchange/Binding feature is currently EXPERIMENTAL."
" Its functionality may change in future releases"
" of the Qpid Dispatch Router. Backward compatibility is"
" not guaranteed.");
}
query->status = QD_AMQP_CREATED;
if (query->body) {
write_config_exchange_map(ex, query->body);
}
} else {
query->status.description = "failed to allocate exchange";
}
exit:
if (query->status.status == QD_AMQP_CREATED.status) {
qd_log(core->agent_log, QD_LOG_DEBUG,
"Exchange %s CREATED (id=%"PRIu64")", ex->name, ex->identity);
} else {
qd_log(core->agent_log, QD_LOG_ERROR,
"Error performing CREATE of %s: %s", config_exchange_entity_type, query->status.description);
// return a NULL body:
if (query->body) qd_compose_insert_null(query->body);
}
if (query->body) {
qdr_agent_enqueue_response_CT(core, query);
} else {
// no body == create from internal config parser
qdr_query_free(query);
}
}