in core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java [350:456]
private void combineShortSessions(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException {
BoolQueryBuilder filterSearch = new BoolQueryBuilder();
filterSearch.must(QueryBuilders.termQuery("IP", user));
String[] indexArr = new String[] { logIndex };
String[] typeArr = new String[] { cleanupType };
int docCount = es.getDocCount(indexArr, typeArr, filterSearch);
if (docCount < 3) {
deleteInvalid(es, user);
return;
}
BoolQueryBuilder filterCheck = new BoolQueryBuilder();
filterCheck.must(QueryBuilders.termQuery("IP", user)).must(QueryBuilders.termQuery("Referer", "-"));
SearchResponse checkReferer = es.getClient()
.prepareSearch(logIndex)
.setTypes(this.cleanupType)
.setScroll(new TimeValue(60000))
.setQuery(filterCheck)
.setSize(0)
.execute()
.actionGet();
long numInvalid = checkReferer.getHits().getTotalHits();
double invalidRate = (double)numInvalid / docCount;
if (invalidRate >= 0.8) {
deleteInvalid(es, user);
return;
}
StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time");
SearchResponse srSession = es.getClient()
.prepareSearch(logIndex)
.setTypes(this.cleanupType)
.setScroll(new TimeValue(60000))
.setQuery(filterSearch)
.addAggregation(AggregationBuilders.terms("Sessions")
.field("SessionID")
.size(docCount)
.subAggregation(statsAgg))
.execute()
.actionGet();
Terms sessions = srSession.getAggregations().get("Sessions");
List<Session> sessionList = new ArrayList<>();
for (Terms.Bucket session : sessions.getBuckets()) {
Stats agg = session.getAggregations().get("Stats");
Session sess = new Session(props, es, agg.getMinAsString(), agg.getMaxAsString(), session.getKey().toString());
sessionList.add(sess);
}
Collections.sort(sessionList);
DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
String last = null;
String lastnewID = null;
String lastoldID = null;
String current;
for (Session s : sessionList) {
current = s.getEndTime();
if (last != null) {
if (Seconds.secondsBetween(fmt.parseDateTime(last), fmt.parseDateTime(current)).getSeconds() < timeThres) {
if (lastnewID == null) {
s.setNewID(lastoldID);
} else {
s.setNewID(lastnewID);
}
QueryBuilder fs = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", s.getID()));
SearchResponse scrollResp = es.getClient()
.prepareSearch(logIndex)
.setTypes(this.cleanupType)
.setScroll(new TimeValue(60000))
.setQuery(fs)
.setSize(100)
.execute()
.actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
if (lastnewID == null) {
update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastoldID);
} else {
update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastnewID);
}
}
scrollResp = es.getClient()
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
}
}
lastoldID = s.getID();
lastnewID = s.getNewID();
last = current;
}
}