in src/main/java/com/amazonaws/services/kinesis/aggregators/app/QueryByLabelServlet.java [91:170]
protected void doAction(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
String namespace = request.getParameter(NAMESPACE_PARAM);
String labelValue = request.getParameter(LABEL_VALUE_PARAM);
String dateValue = request.getParameter(DATE_VALUE_PARAM);
String operator = request.getParameter(OPERATOR_PARAM);
// have to provide namespace and label
if (namespace == null) {
doError(response, String.format("Argument '%s' must not be null", NAMESPACE_PARAM));
return;
}
if (labelValue == null || labelValue.equals("")) {
doError(response, String.format("Argument '%s' must not be null", LABEL_VALUE_PARAM));
return;
}
// if date value is provided, the so too must operator and granularity
ComparisonOperator setOperator = null;
if (dateValue != null && operator == null) {
setOperator = ComparisonOperator.EQ;
}
if (operator != null) {
try {
setOperator = ComparisonOperator.fromValue(operator);
} catch (Exception e) {
doError(response, String.format("%s is an invalid Comparison Operator", operator));
return;
}
}
String streamName = (String) request.getServletContext().getAttribute(
AggregatorsConstants.STREAM_NAME_PARAM);
AggregatorGroup aggGroup = (AggregatorGroup) request.getServletContext().getAttribute(
AggregatorsBeanstalkApp.AGGREGATOR_GROUP_PARAM);
if (aggGroup == null) {
doError(response, "Aggregator Application Not Initialised");
return;
} else {
// initialise the aggregator group onto shard 'none' for this
// operation
// - it may already be initialised
try {
aggGroup.initialize("none");
} catch (Exception e) {
throw new ServletException(e);
}
}
Date dateValueAsDate = null;
if (dateValue != null) {
try {
dateValueAsDate = StreamAggregator.dateFormatter.parse(dateValue);
} catch (ParseException e1) {
throw new ServletException(e1);
}
}
// put the initialised aggregator group back into the context
request.getServletContext().setAttribute(AggregatorsBeanstalkApp.AGGREGATOR_GROUP_PARAM,
aggGroup);
// acquire the correct aggregator by namespace
for (StreamAggregator agg : aggGroup.getAggregators()) {
if (agg.getNamespace().equals(namespace)) {
// run the query
try {
respondWith(response, agg.queryValue(labelValue, dateValueAsDate, setOperator));
return;
} catch (Exception e) {
throw new ServletException(e);
}
}
}
// shouldn't get here, so bail with a meaning error on namespace
doError(response,
String.format("Unable to acquire Aggregator with Namespace %s", namespace));
}