public synchronized void put()

in gobblin-metastore/src/main/java/org/apache/gobblin/metastore/database/DatabaseJobHistoryStoreV100.java [184:272]


  public synchronized void put(JobExecutionInfo jobExecutionInfo) throws IOException {
    Optional<Connection> connectionOptional = Optional.absent();
    try {
      connectionOptional = Optional.of(getConnection());
      Connection connection = connectionOptional.get();
      connection.setAutoCommit(false);

      // Insert or update job execution information
      if (existsJobExecutionInfo(connection, jobExecutionInfo)) {
        updateJobExecutionInfo(connection, jobExecutionInfo);
      } else {
        insertJobExecutionInfo(connection, jobExecutionInfo);
      }

      // Insert or update job metrics
      if (jobExecutionInfo.hasMetrics()) {
        for (Metric metric : jobExecutionInfo.getMetrics()) {
          boolean insert =
              !existsMetric(connection, JOB_METRIC_EXIST_QUERY_STATEMENT_TEMPLATE, jobExecutionInfo.getJobId(), metric);
          updateMetric(connection, insert ? JOB_METRIC_INSERT_STATEMENT_TEMPLATE : JOB_METRIC_UPDATE_STATEMENT_TEMPLATE,
              jobExecutionInfo.getJobId(), metric, insert);
        }
      }

      // Insert or update job properties
      if (jobExecutionInfo.hasJobProperties()) {
        for (Map.Entry<String, String> entry : jobExecutionInfo.getJobProperties().entrySet()) {
          boolean insert = !existsProperty(connection, JOB_PROPERTY_EXIST_QUERY_STATEMENT_TEMPLATE,
              jobExecutionInfo.getJobId(), entry.getKey());
          updateProperty(connection,
              insert ? JOB_PROPERTY_INSERT_STATEMENT_TEMPLATE : JOB_PROPERTY_UPDATE_STATEMENT_TEMPLATE,
              jobExecutionInfo.getJobId(), entry.getKey(), entry.getValue(), insert);
        }
      }

      // Insert or update task execution information
      if (jobExecutionInfo.hasTaskExecutions()) {
        for (TaskExecutionInfo info : jobExecutionInfo.getTaskExecutions()) {
          // Insert or update task execution information
          if (existsTaskExecutionInfo(connection, info)) {
            updateTaskExecutionInfo(connection, info);
          } else {
            insertTaskExecutionInfo(connection, info);
          }
          // Insert or update task metrics
          if (info.hasMetrics()) {
            for (Metric metric : info.getMetrics()) {
              boolean insert =
                  !existsMetric(connection, TASK_METRIC_EXIST_QUERY_STATEMENT_TEMPLATE, info.getTaskId(), metric);
              updateMetric(connection,
                  insert ? TASK_METRIC_INSERT_STATEMENT_TEMPLATE : TASK_METRIC_UPDATE_STATEMENT_TEMPLATE,
                  info.getTaskId(), metric, insert);
            }
          }

          // Insert or update task properties
          if (info.hasTaskProperties()) {
            for (Map.Entry<String, String> entry : info.getTaskProperties().entrySet()) {
              boolean insert = !existsProperty(connection, TASK_PROPERTY_EXIST_QUERY_STATEMENT_TEMPLATE,
                  info.getTaskId(), entry.getKey());
              updateProperty(connection,
                  insert ? TASK_PROPERTY_INSERT_STATEMENT_TEMPLATE : TASK_PROPERTY_UPDATE_STATEMENT_TEMPLATE,
                  info.getTaskId(), entry.getKey(), entry.getValue(), insert);
            }
          }
        }
      }

      connection.commit();
    } catch (SQLException se) {
      LOGGER.error("Failed to put a new job execution information record", se);
      if (connectionOptional.isPresent()) {
        try {
          connectionOptional.get().rollback();
        } catch (SQLException se1) {
          LOGGER.error("Failed to rollback", se1);
        }
      }
      throw new IOException(se);
    } finally {
      if (connectionOptional.isPresent()) {
        try {
          connectionOptional.get().close();
        } catch (SQLException se) {
          LOGGER.error("Failed to close connection", se);
        }
      }
    }
  }