private void combineShortSessions()

in core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java [350:456]


  private void combineShortSessions(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException {

    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
    filterSearch.must(QueryBuilders.termQuery("IP", user));

    String[] indexArr = new String[] { logIndex };
    String[] typeArr = new String[] { cleanupType };
    int docCount = es.getDocCount(indexArr, typeArr, filterSearch);

    if (docCount < 3) {
      deleteInvalid(es, user);
      return;
    }

    BoolQueryBuilder filterCheck = new BoolQueryBuilder();
    filterCheck.must(QueryBuilders.termQuery("IP", user)).must(QueryBuilders.termQuery("Referer", "-"));
    SearchResponse checkReferer = es.getClient()
            .prepareSearch(logIndex)
            .setTypes(this.cleanupType)
            .setScroll(new TimeValue(60000))
            .setQuery(filterCheck)
            .setSize(0)
            .execute()
            .actionGet();

    long numInvalid = checkReferer.getHits().getTotalHits();
    double invalidRate = (double)numInvalid / docCount;

    if (invalidRate >= 0.8) {
      deleteInvalid(es, user);
      return;
    }

    StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time");
    SearchResponse srSession = es.getClient()
            .prepareSearch(logIndex)
            .setTypes(this.cleanupType)
            .setScroll(new TimeValue(60000))
            .setQuery(filterSearch)
            .addAggregation(AggregationBuilders.terms("Sessions")
            .field("SessionID")
            .size(docCount)
            .subAggregation(statsAgg))
            .execute()
            .actionGet();

    Terms sessions = srSession.getAggregations().get("Sessions");

    List<Session> sessionList = new ArrayList<>();
    for (Terms.Bucket session : sessions.getBuckets()) {
      Stats agg = session.getAggregations().get("Stats");
      Session sess = new Session(props, es, agg.getMinAsString(), agg.getMaxAsString(), session.getKey().toString());
      sessionList.add(sess);
    }

    Collections.sort(sessionList);
    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
    String last = null;
    String lastnewID = null;
    String lastoldID = null;
    String current;
    for (Session s : sessionList) {
      current = s.getEndTime();
      if (last != null) {
        if (Seconds.secondsBetween(fmt.parseDateTime(last), fmt.parseDateTime(current)).getSeconds() < timeThres) {
          if (lastnewID == null) {
            s.setNewID(lastoldID);
          } else {
            s.setNewID(lastnewID);
          }

          QueryBuilder fs = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", s.getID()));

          SearchResponse scrollResp = es.getClient()
                  .prepareSearch(logIndex)
                  .setTypes(this.cleanupType)
                  .setScroll(new TimeValue(60000))
                  .setQuery(fs)
                  .setSize(100)
                  .execute()
                  .actionGet();
          while (true) {
            for (SearchHit hit : scrollResp.getHits().getHits()) {
              if (lastnewID == null) {
                update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastoldID);
              } else {
                update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastnewID);
              }
            }

            scrollResp = es.getClient()
                    .prepareSearchScroll(scrollResp.getScrollId())
                    .setScroll(new TimeValue(600000))
                    .execute()
                    .actionGet();
            if (scrollResp.getHits().getHits().length == 0) {
              break;
            }
          }
        }
      }
      lastoldID = s.getID();
      lastnewID = s.getNewID();
      last = current;
    }

  }