in core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java [146:324]
private int processSession(ESDriver es, String sessionId) throws IOException, InterruptedException, ExecutionException {
String inputType = cleanupType;
String outputType = sessionStats;
DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
String min = null;
String max = null;
DateTime start = null;
DateTime end = null;
int duration = 0;
float requestRate = 0;
int sessionCount = 0;
Pattern pattern = Pattern.compile("get (.*?) http/*");
StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time");
BoolQueryBuilder filterSearch = new BoolQueryBuilder();
filterSearch.must(QueryBuilders.termQuery("SessionID", sessionId));
SearchResponse sr = es.getClient()
.prepareSearch(logIndex)
.setTypes(inputType)
.setQuery(filterSearch)
.addAggregation(statsAgg)
.execute()
.actionGet();
Stats agg = sr.getAggregations().get("Stats");
min = agg.getMinAsString();
max = agg.getMaxAsString();
start = fmt.parseDateTime(min);
end = fmt.parseDateTime(max);
duration = Seconds.secondsBetween(start, end).getSeconds();
int searchDataListRequestCount = 0;
int searchDataRequestCount = 0;
int searchDataListRequestByKeywordsCount = 0;
int ftpRequestCount = 0;
int keywordsNum = 0;
String iP = null;
String keywords = "";
String views = "";
String downloads = "";
SearchResponse scrollResp = es.getClient()
.prepareSearch(logIndex)
.setTypes(inputType)
.setScroll(new TimeValue(60000))
.setQuery(filterSearch)
.setSize(100)
.execute().actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
Map<String, Object> result = hit.getSource();
String request = (String) result.get("Request");
String logType = (String) result.get("LogType");
iP = (String) result.get("IP");
Matcher matcher = pattern.matcher(request.trim().toLowerCase(Locale.ENGLISH));
while (matcher.find()) {
request = matcher.group(1);
}
String datasetlist = props.getProperty(MudrodConstants.SEARCH_MARKER);
String dataset = props.getProperty(MudrodConstants.VIEW_MARKER);
if (request.contains(datasetlist)) {
searchDataListRequestCount++;
RequestUrl requestURL = new RequestUrl();
String infoStr = requestURL.getSearchInfo(request) + ",";
String info = es.customAnalyzing(props.getProperty(MudrodConstants.ES_INDEX_NAME), infoStr);
if (!",".equals(info)) {
if ("".equals(keywords)) {
keywords = keywords + info;
} else {
String[] items = info.split(",");
String[] keywordList = keywords.split(",");
for (String item : items) {
if (!Arrays.asList(keywordList).contains(item)) {
keywords = keywords + item + ",";
}
}
}
}
}
if (request.startsWith(dataset)) {
searchDataRequestCount++;
if (findDataset(request) != null) {
String view = findDataset(request);
if ("".equals(views))
views = view;
else if (!views.contains(view))
views = views + "," + view;
}
}
if (MudrodConstants.FTP_LOG.equals(logType)) {
ftpRequestCount++;
String download = "";
String requestLowercase = request.toLowerCase(Locale.ENGLISH);
if (!requestLowercase.endsWith(".jpg") &&
!requestLowercase.endsWith(".pdf") &&
!requestLowercase.endsWith(".txt") &&
!requestLowercase.endsWith(".gif")) {
download = request;
}
if ("".equals(downloads)) {
downloads = download;
} else {
if (!downloads.contains(download)) {
downloads = downloads + "," + download;
}
}
}
}
scrollResp = es.getClient()
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
// Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
if (!"".equals(keywords)) {
keywordsNum = keywords.split(",").length;
}
if (searchDataListRequestCount != 0 &&
searchDataListRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) &&
searchDataRequestCount != 0 &&
searchDataRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) &&
ftpRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F)))
{
String sessionURL = props.getProperty(
MudrodConstants.SESSION_PORT)
+ props.getProperty(MudrodConstants.SESSION_URL)
+ "?sessionid=" + sessionId
+ "&sessionType=" + outputType
+ "&requestType=" + inputType;
sessionCount = 1;
IndexRequest ir = new IndexRequest(logIndex, outputType).source(
jsonBuilder().startObject()
.field("SessionID", sessionId)
.field("SessionURL", sessionURL)
.field("Duration", duration)
.field("Number of Keywords", keywordsNum)
.field("Time", min)
.field("End_time", max)
.field("searchDataListRequest_count", searchDataListRequestCount)
.field("searchDataListRequest_byKeywords_count", searchDataListRequestByKeywordsCount)
.field("searchDataRequest_count", searchDataRequestCount)
.field("keywords", es.customAnalyzing(logIndex, keywords))
.field("views", views)
.field("downloads", downloads)
.field("request_rate", requestRate)
.field("Comments", "")
.field("Validation", 0)
.field("Produceby", 0)
.field("Correlation", 0)
.field("IP", iP).endObject());
es.getBulkProcessor().add(ir);
}
return sessionCount;
}