in src/router_core/agent_config_address.c [336:528]
void qdra_config_address_create_CT(qdr_core_t *core,
qd_iterator_t *name,
qdr_query_t *query,
qd_parsed_field_t *in_body)
{
char *pattern = NULL;
while (true) {
//
// Ensure there isn't a duplicate name
//
qdr_address_config_t *addr = 0;
if (name) {
qd_iterator_view_t iter_view = qd_iterator_get_view(name);
qd_iterator_annotate_prefix(name, CONFIG_ADDRESS_PREFIX);
qd_iterator_reset_view(name, ITER_VIEW_ADDRESS_HASH);
qd_hash_retrieve(core->addr_lr_al_hash, name, (void**) &addr);
qd_iterator_reset_view(name, iter_view);
}
if (!!addr) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Name conflicts with an existing entity";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
// Ensure that the body is a map
if (!qd_parse_is_map(in_body)) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Body of request must be a map";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// Extract the fields from the request
//
qd_parsed_field_t *prefix_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PREFIX]);
qd_parsed_field_t *pattern_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PATTERN]);
qd_parsed_field_t *distrib_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_DISTRIBUTION]);
qd_parsed_field_t *waypoint_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]);
qd_parsed_field_t *in_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]);
qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]);
qd_parsed_field_t *priority_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PRIORITY]);
qd_parsed_field_t *fallback_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_FALLBACK]);
bool waypoint = waypoint_field ? qd_parse_as_bool(waypoint_field) : false;
long in_phase = in_phase_field ? qd_parse_as_long(in_phase_field) : -1;
long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : -1;
long priority = priority_field ? qd_parse_as_long(priority_field) : -1;
bool fallback = fallback_field ? qd_parse_as_bool(fallback_field) : false;
//
// Either a prefix or a pattern field is mandatory. Prefix and pattern
// are mutually exclusive. Fail if either both or none are given.
//
const char *msg = NULL;
if (!prefix_field && !pattern_field) {
msg = "Either a 'prefix' or 'pattern' attribute must be provided";
} else if (prefix_field && pattern_field) {
msg = "Cannot specify both a 'prefix' and a 'pattern' attribute";
}
if (fallback && (waypoint || in_phase > 0 || out_phase > 0)) {
msg = "Fallback cannot be specified with waypoint or non-zero ingress and egress phases";
}
if (msg) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = msg;
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
// validate the pattern/prefix, add "/#" if prefix
pattern = qdra_config_address_validate_pattern_CT((prefix_field) ? prefix_field : pattern_field,
!!prefix_field,
&msg);
if (!pattern) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = msg;
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// Handle the address-phasing logic. If the phases are provided, use them. Otherwise
// use the waypoint flag to set the most common defaults.
//
if (in_phase == -1 && out_phase == -1) {
in_phase = 0;
out_phase = waypoint ? 1 : 0;
}
//
// Validate the phase values
//
if (in_phase < 0 || in_phase > 9 || out_phase < 0 || out_phase > 9) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Phase values must be between 0 and 9";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// Validate the priority values.
//
if (priority > QDR_MAX_PRIORITY ) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Priority value, if present, must be between 0 and QDR_MAX_PRIORITY";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// The request is valid. Attempt to insert the address pattern into
// the parse tree, fail if there is already an entry for that pattern
//
addr = new_qdr_address_config_t();
if (!addr) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Out of memory";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
ZERO(addr);
//
// Insert the uninitialized address to check if it already exists in
// the parse tree. On success initialize it. This is thread safe
// since the current thread (core) is the only thread allowed to use
// the parse tree
//
qd_error_t rc = qd_parse_tree_add_pattern_str(core->addr_parse_tree, pattern, addr);
if (rc) {
free_qdr_address_config_t(addr);
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = qd_error_name(rc);
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
addr->ref_count = 1; // Represents the reference from the addr_config list
addr->name = name ? (char*) qd_iterator_copy(name) : 0;
addr->identity = qdr_identifier(core);
addr->treatment = qdra_address_treatment_CT(distrib_field);
addr->in_phase = in_phase;
addr->out_phase = out_phase;
addr->is_prefix = !!prefix_field;
addr->pattern = pattern;
addr->priority = priority;
addr->fallback = fallback;
pattern = 0;
DEQ_INSERT_TAIL(core->addr_config, addr);
if (name) {
qd_iterator_view_t iter_view = qd_iterator_get_view(name);
qd_iterator_reset_view(name, ITER_VIEW_ADDRESS_HASH);
qd_hash_insert(core->addr_lr_al_hash, name, addr, &addr->hash_handle);
qd_iterator_reset_view(name, iter_view);
}
//
// Compose the result map for the response.
//
if (query->body) {
qd_compose_start_map(query->body);
for (int col = 0; col < QDR_CONFIG_ADDRESS_COLUMN_COUNT; col++)
qdr_config_address_insert_column_CT(addr, col, query->body, true);
qd_compose_end_map(query->body);
}
query->status = QD_AMQP_CREATED;
break;
}
//
// Enqueue the response if there is a body. If there is no body, this is a management
// operation created internally by the configuration file parser.
//
if (query->body) {
//
// If there was an error in processing the create, insert a NULL value into the body.
//
if (query->status.status / 100 > 2)
qd_compose_insert_null(query->body);
qdr_agent_enqueue_response_CT(core, query);
} else
qdr_query_free(query);
free(pattern);
}