in runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java [314:360]
private void saveOptimizationMetrics(final String jobId, final Connection c, final String[] syntax) {
try (Statement statement = c.createStatement()) {
statement.setQueryTimeout(30); // set timeout to 30 sec.
getMetricMap(JobMetric.class).values().forEach(o -> {
final JobMetric jobMetric = (JobMetric) o;
final String tableName = jobMetric.getIrDagSummary();
final long startTime = jobMetric.getStateTransitionEvents().stream()
.filter(ste -> ste.getPrevState().equals(PlanState.State.READY)
&& ste.getNewState().equals(PlanState.State.EXECUTING))
.findFirst().orElseThrow(() -> new MetricException("job has never started"))
.getTimestamp();
final long endTime = jobMetric.getStateTransitionEvents().stream()
.filter(ste -> ste.getNewState().equals(PlanState.State.COMPLETE))
.findFirst().orElseThrow(() -> new MetricException("job has never completed"))
.getTimestamp();
final long duration = endTime - startTime; // ms
final String vertexProperties = jobMetric.getVertexProperties();
final String edgeProperties = jobMetric.getEdgeProperties();
final Long inputSize = jobMetric.getInputSize();
final long jvmMemSize = Runtime.getRuntime().maxMemory();
final long memSize = ((com.sun.management.OperatingSystemMXBean) ManagementFactory
.getOperatingSystemMXBean()).getTotalPhysicalMemorySize();
try {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName
+ " (id " + syntax[0] + ", duration BIGINT NOT NULL, inputsize BIGINT NOT NULL, "
+ "jvmmemsize BIGINT NOT NULL, memsize BIGINT NOT NULL, "
+ "vertex_properties TEXT NOT NULL, edge_properties TEXT NOT NULL, "
+ "note TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);");
LOG.info("CREATED TABLE For {} IF NOT PRESENT", tableName);
statement.executeUpdate("INSERT INTO " + tableName
+ " (duration, inputsize, jvmmemsize, memsize, vertex_properties, edge_properties, note) "
+ "VALUES (" + duration + ", " + inputSize + ", "
+ jvmMemSize + ", " + memSize + ", '"
+ vertexProperties + "', '" + edgeProperties + "', '" + jobId + "');");
LOG.info("Recorded metrics on the table for {}", tableName);
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics: {}", e);
}
});
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics: {}", e);
}
}