in helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkLogCSVFormatter.java [95:319]
private static void format(String logfilepath, String outputDir) throws FileNotFoundException {
try {
// input file
FileInputStream fis = new FileInputStream(logfilepath);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
// output files
FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
FileOutputStream smdCntFos =
new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
FileOutputStream smdNextFos =
new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
FileOutputStream hrPerfFos =
new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
FileOutputStream liFos = new FileOutputStream(outputDir + "/" + "liveInstances.csv");
BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
formatter(isBw, "timestamp", "resourceName", "partitionNumber", "mode", "partition",
"instanceName", "priority");
formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
formatter(csBw, "timestamp", "resourceName", "partition", "instanceName", "sessionId",
"state");
formatter(msgBw, "timestamp", "resourceName", "partition", "instanceName", "sessionId",
"from", "to", "messageType", "messageState");
formatter(hrPerfBw, "timestamp", "instanceName", "availableCPUs", "averageSystemLoad",
"freeJvmMemory", "freePhysicalMemory", "totalJvmMemory");
Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
int pos;
String inputLine;
while ((inputLine = br.readLine()) != null) {
if (inputLine.indexOf("CONFIGS") != -1) {
pos = inputLine.indexOf("CONFIGS");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
formatter(cfgBw, timestamp, record.getId(), record.getSimpleField("HOST"),
record.getSimpleField("PORT"), record.getSimpleField("ENABLED"));
}
} else if (inputLine.indexOf("IDEALSTATES") != -1) {
pos = inputLine.indexOf("IDEALSTATES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
// System.out.println("record=" + record);
for (String partition : record.getListFields().keySet()) {
List<String> preferenceList = record.getListFields().get(partition);
for (int i = 0; i < preferenceList.size(); i++) {
String instance = preferenceList.get(i);
formatter(isBw, timestamp, record.getId(),
record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
record.getSimpleField(IdealStateProperty.REBALANCE_MODE.toString()), partition,
instance, Integer.toString(i));
}
}
}
} else if (inputLine.indexOf("LIVEINSTANCES") != -1) {
pos = inputLine.indexOf("LIVEINSTANCES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
String zkSessionId = getAttributeValue(inputLine, "session:");
if (zkSessionId == null) {
System.err.println("no zk session id associated with the adding of live instance: "
+ inputLine);
} else {
liveInstanceSessionMap.put(zkSessionId, record);
}
}
} else if (inputLine.indexOf("EXTERNALVIEW") != -1) {
pos = inputLine.indexOf("EXTERNALVIEW");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
// System.out.println("record=" + record);
for (String partition : record.getMapFields().keySet()) {
Map<String, String> stateMap = record.getMapFields().get(partition);
for (String instance : stateMap.keySet()) {
String state = stateMap.get(instance);
formatter(evBw, timestamp, record.getId(), partition, instance, state);
}
}
}
} else if (inputLine.indexOf("STATEMODELDEFS") != -1) {
pos = inputLine.indexOf("STATEMODELDEFS");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
for (String stateInfo : record.getMapFields().keySet()) {
if (stateInfo.endsWith(".meta")) {
Map<String, String> metaMap = record.getMapFields().get(stateInfo);
formatter(smdCntBw, timestamp, record.getId(),
stateInfo.substring(0, stateInfo.indexOf('.')), metaMap.get("count"));
} else if (stateInfo.endsWith(".next")) {
Map<String, String> nextMap = record.getMapFields().get(stateInfo);
for (String destState : nextMap.keySet()) {
formatter(smdNextBw, timestamp, record.getId(),
stateInfo.substring(0, stateInfo.indexOf('.')), destState,
nextMap.get(destState));
}
}
}
}
} else if (inputLine.indexOf("CURRENTSTATES") != -1) {
pos = inputLine.indexOf("CURRENTSTATES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
// System.out.println("record=" + record);
for (String partition : record.getMapFields().keySet()) {
Map<String, String> stateMap = record.getMapFields().get(partition);
String path = getAttributeValue(inputLine, "path:");
if (path != null) {
String instance = HelixUtil.getInstanceNameFromPath(path);
formatter(csBw, timestamp, record.getId(), partition, instance,
record.getSimpleField("SESSION_ID"), stateMap.get("CURRENT_STATE"));
}
}
}
} else if (inputLine.indexOf("MESSAGES") != -1) {
pos = inputLine.indexOf("MESSAGES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
formatter(msgBw, timestamp, record.getSimpleField("RESOURCE_NAME"),
record.getSimpleField("PARTITION_NAME"), record.getSimpleField("TGT_NAME"),
record.getSimpleField("TGT_SESSION_ID"), record.getSimpleField("FROM_STATE"),
record.getSimpleField("TO_STATE"), record.getSimpleField("MSG_TYPE"),
record.getSimpleField("MSG_STATE"));
}
} else if (inputLine.indexOf("closeSession") != -1) {
String zkSessionId = getAttributeValue(inputLine, "session:");
if (zkSessionId == null) {
System.err.println("no zk session id associated with the closing of zk session: "
+ inputLine);
} else {
ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
// System.err.println("zkSessionId:" + zkSessionId + ", record:" + record);
if (record != null) {
String timestamp = getAttributeValue(inputLine, "time:");
formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"),
"DELETE");
}
}
} else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1) {
pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
String path = getAttributeValue(inputLine, "path:");
if (path != null) {
String instance = HelixUtil.getInstanceNameFromPath(path);
formatter(hrPerfBw, timestamp, instance, record.getSimpleField("availableCPUs"),
record.getSimpleField("averageSystemLoad"),
record.getSimpleField("freeJvmMemory"),
record.getSimpleField("freePhysicalMemory"),
record.getSimpleField("totalJvmMemory"));
}
}
}
}
br.close();
isBw.close();
cfgBw.close();
evBw.close();
smdCntBw.close();
smdNextBw.close();
csBw.close();
msgBw.close();
liBw.close();
hrPerfBw.close();
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
}
}