in src/main/java/com/amazonaws/services/kinesis/aggregators/app/DateQueryServlet.java [89:162]
public void doAction(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
String namespace = request.getParameter(NAMESPACE_PARAM);
String dateValue = request.getParameter(DATE_VALUE_PARAM);
String operator = request.getParameter(OPERATOR_PARAM);
String granularity = request.getParameter(GRANULARITY_PARAM);
// create the date item
Date d = null;
try {
d = StreamAggregator.dateFormatter.parse(dateValue);
} catch (Exception e) {
doError(response, String.format("Date Parameter must be in format %s",
StreamAggregator.dateFormatter.getDateFormatSymbols().toString()));
return;
}
// create the ComparisonOperator for Dynamo from the argument
ComparisonOperator c = null;
try {
c = ComparisonOperator.fromValue(operator);
} catch (Exception e) {
doError(response, String.format("%s is an invalid Comparison Operator", operator));
return;
}
// create the Time Horizon value from the argument
TimeHorizon h = null;
try {
h = TimeHorizon.valueOf(granularity);
} catch (Exception e) {
doError(response, String.format("%s is an invalid Granularity", granularity));
return;
}
String streamName = (String) request.getServletContext().getAttribute(
AggregatorsConstants.STREAM_NAME_PARAM);
AggregatorGroup aggGroup = (AggregatorGroup) request.getServletContext().getAttribute(
AggregatorsBeanstalkApp.AGGREGATOR_GROUP_PARAM);
if (aggGroup == null) {
doError(response, "Aggregator Application Not Initialised");
return;
}
// initialise the aggregator group onto shard 'none' for this operation
// - it may already be initialised
try {
aggGroup.initialize("none");
} catch (Exception e) {
throw new ServletException(e);
}
// put the initialised aggregator group back into the context
request.getServletContext().setAttribute(AggregatorsBeanstalkApp.AGGREGATOR_GROUP_PARAM,
aggGroup);
// acquire the correct aggregator by namespace
for (StreamAggregator agg : aggGroup.getAggregators()) {
if (agg.getNamespace().equals(namespace)) {
// run the query
try {
respondWith(response, agg.queryByDate(d, h, c, QUERY_THREADS));
return;
} catch (Exception e) {
throw new ServletException(e);
}
}
}
// shouldn't get here, so bail with a meaning error on namespace
doError(response,
String.format("Unable to acquire Aggregator with Namespace %s", namespace));
}