in core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java [66:158]
private void generateBinaryMatrix() {
try {
File file = new File(props.getProperty(MudrodConstants.USER_HISTORY_PATH));
if (file.exists()) {
file.delete();
}
file.createNewFile();
try (OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(file.getAbsoluteFile()), StandardCharsets.UTF_8)) {
osw.write("Num" + ",");
// step 1: write first row of csv
List<String> logIndexList = es.getIndexListWithPrefix(
props.getProperty(MudrodConstants.LOG_INDEX));
String[] logIndices = logIndexList.toArray(new String[0]);
String[] statictypeArray = new String[]{this.sessionStats};
int docCount = es.getDocCount(logIndices, statictypeArray);
LOG.info("{}: {}", this.sessionStats, docCount);
if (docCount == 0) {
osw.close();
file.delete();
return;
}
SearchResponse sr = es.getClient()
.prepareSearch(logIndices)
.setTypes(statictypeArray)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(0)
.addAggregation(AggregationBuilders.terms("IPs")
.field("IP")
.size(docCount))
.execute()
.actionGet();
Terms ips = sr.getAggregations().get("IPs");
List<String> ipList = new ArrayList<>();
for (Terms.Bucket entry : ips.getBuckets()) {
// filter
if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) {
// out less active users/ips
ipList.add(entry.getKey().toString());
}
}
osw.write(String.join(",", ipList) + "\n");
// step 2: step the rest rows of csv
SearchRequestBuilder sr2Builder = es.getClient()
.prepareSearch(logIndices)
.setTypes(statictypeArray)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(0)
.addAggregation(AggregationBuilders.terms("KeywordAgg")
.field("keywords")
.size(docCount)
.subAggregation(AggregationBuilders.terms("IPAgg")
.field("IP")
.size(docCount)));
SearchResponse sr2 = sr2Builder.execute().actionGet();
Terms keywords = sr2.getAggregations().get("KeywordAgg");
for (Terms.Bucket keyword : keywords.getBuckets()) {
Map<String, Integer> ipMap = new HashMap<>();
Terms ipAgg = keyword.getAggregations().get("IPAgg");
int distinctUser = ipAgg.getBuckets().size();
if (distinctUser >= Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) {
osw.write(keyword.getKey() + ",");
for (Terms.Bucket IP : ipAgg.getBuckets()) {
ipMap.put(IP.getKey().toString(), 1);
}
for (String anIpList : ipList) {
if (ipMap.containsKey(anIpList)) {
osw.write(ipMap.get(anIpList) + ",");
} else {
osw.write("0,");
}
}
osw.write("\n");
}
}
}
} catch (IOException e) {
LOG.error("Failed to generate Binary Matrix : ", e);
}
}