in core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java [204:323]
private int genSessionByReferer(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException {
String startTime = null;
int sessionCountIn = 0;
BoolQueryBuilder filterSearch = new BoolQueryBuilder();
filterSearch.must(QueryBuilders.termQuery("IP", user));
SearchResponse scrollResp = es.getClient()
.prepareSearch(logIndex)
.setTypes(this.cleanupType)
.setScroll(new TimeValue(60000))
.setQuery(filterSearch)
.addSort("Time", SortOrder.ASC)
.setSize(100)
.execute()
.actionGet();
Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>();
String request;
String referer;
String logType;
String id;
String ip = user;
String indexUrl = props.getProperty(MudrodConstants.BASE_URL) + "/";
DateTime time;
DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
while (scrollResp.getHits().getHits().length != 0) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
Map<String, Object> result = hit.getSource();
request = (String) result.get("RequestUrl");
referer = (String) result.get("Referer");
logType = (String) result.get("LogType");
time = fmt.parseDateTime((String) result.get("Time"));
id = hit.getId();
if (MudrodConstants.HTTP_LOG.equals(logType)) {
if ("-".equals(referer) || referer.equals(indexUrl) || !referer.contains(indexUrl)) {
sessionCountIn++;
sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>());
sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn);
} else {
int count = sessionCountIn;
int rollbackNum = 0;
while (true) {
Map<String, DateTime> requests = sessionReqs.get(ip + "@" + count);
if (requests == null) {
sessionReqs.put(ip + "@" + count, new HashMap<String, DateTime>());
sessionReqs.get(ip + "@" + count).put(request, time);
update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count);
break;
}
ArrayList<String> keys = new ArrayList<>(requests.keySet());
boolean bFindRefer = false;
for (int i = keys.size() - 1; i >= 0; i--) {
rollbackNum++;
if (keys.get(i).equalsIgnoreCase(referer)) {
bFindRefer = true;
// threshold,if time interval > 10*
// click num, start a new session
if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(i)), time).getSeconds()) < timeThres * rollbackNum) {
sessionReqs.get(ip + "@" + count).put(request, time);
update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count);
} else {
sessionCountIn++;
sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>());
sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn);
}
break;
}
}
if (bFindRefer) {
break;
}
count--;
if (count < 0) {
sessionCountIn++;
sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>());
sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
update(es, props.getProperty(MudrodConstants.ES_INDEX_NAME), this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn);
break;
}
}
}
} else if (MudrodConstants.FTP_LOG.equals(logType)) {
// may affect computation efficiency
Map<String, DateTime> requests = sessionReqs.get(ip + "@" + sessionCountIn);
if (requests == null) {
sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>());
} else {
ArrayList<String> keys = new ArrayList<>(requests.keySet());
int size = keys.size();
if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(size - 1)), time).getSeconds()) > timeThres) {
sessionCountIn += 1;
sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>());
}
}
sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn);
}
}
scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
}
return sessionCountIn;
}