private void saveOptimizationMetrics()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java [314:360]


  private void saveOptimizationMetrics(final String jobId, final Connection c, final String[] syntax) {
    try (Statement statement = c.createStatement()) {
      statement.setQueryTimeout(30);  // set timeout to 30 sec.

      getMetricMap(JobMetric.class).values().forEach(o -> {
        final JobMetric jobMetric = (JobMetric) o;
        final String tableName = jobMetric.getIrDagSummary();

        final long startTime = jobMetric.getStateTransitionEvents().stream()
          .filter(ste -> ste.getPrevState().equals(PlanState.State.READY)
            && ste.getNewState().equals(PlanState.State.EXECUTING))
          .findFirst().orElseThrow(() -> new MetricException("job has never started"))
          .getTimestamp();
        final long endTime = jobMetric.getStateTransitionEvents().stream()
          .filter(ste -> ste.getNewState().equals(PlanState.State.COMPLETE))
          .findFirst().orElseThrow(() -> new MetricException("job has never completed"))
          .getTimestamp();
        final long duration = endTime - startTime;  // ms
        final String vertexProperties = jobMetric.getVertexProperties();
        final String edgeProperties = jobMetric.getEdgeProperties();
        final Long inputSize = jobMetric.getInputSize();
        final long jvmMemSize = Runtime.getRuntime().maxMemory();
        final long memSize = ((com.sun.management.OperatingSystemMXBean) ManagementFactory
          .getOperatingSystemMXBean()).getTotalPhysicalMemorySize();

        try {
          statement.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName
            + " (id " + syntax[0] + ", duration BIGINT NOT NULL, inputsize BIGINT NOT NULL, "
            + "jvmmemsize BIGINT NOT NULL, memsize BIGINT NOT NULL, "
            + "vertex_properties TEXT NOT NULL, edge_properties TEXT NOT NULL, "
            + "note TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);");
          LOG.info("CREATED TABLE For {} IF NOT PRESENT", tableName);

          statement.executeUpdate("INSERT INTO " + tableName
            + " (duration, inputsize, jvmmemsize, memsize, vertex_properties, edge_properties, note) "
            + "VALUES (" + duration + ", " + inputSize + ", "
            + jvmMemSize + ", " + memSize + ", '"
            + vertexProperties + "', '" + edgeProperties + "', '" + jobId + "');");
          LOG.info("Recorded metrics on the table for {}", tableName);
        } catch (SQLException e) {
          LOG.error("Error while saving optimization metrics: {}", e);
        }
      });
    } catch (SQLException e) {
      LOG.error("Error while saving optimization metrics: {}", e);
    }
  }