private JobExecutionInfo processQueryById()

in gobblin-metastore/src/main/java/org/apache/gobblin/metastore/database/DatabaseJobHistoryStoreV100.java [527:644]


  private JobExecutionInfo processQueryById(Connection connection, String jobId, JobExecutionQuery query,
                        Filter tableFilter) throws SQLException {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobId));

    // Query job execution information
    try (PreparedStatement jobExecutionQueryStatement = connection
        .prepareStatement(JOB_EXECUTION_QUERY_BY_JOB_ID_STATEMENT_TEMPLATE)) {
      jobExecutionQueryStatement.setString(1, jobId);
      try (ResultSet jobRs = jobExecutionQueryStatement.executeQuery()) {
        if (!jobRs.next()) {
          return null;
        }

        JobExecutionInfo jobExecutionInfo = resultSetToJobExecutionInfo(jobRs);

        // Query job metrics
        if (query.isIncludeJobMetrics()) {
          try (PreparedStatement jobMetricQueryStatement = connection
              .prepareStatement(JOB_METRIC_QUERY_STATEMENT_TEMPLATE)) {
            jobMetricQueryStatement.setString(1, jobRs.getString(2));
            try (ResultSet jobMetricRs = jobMetricQueryStatement.executeQuery()) {
              MetricArray jobMetrics = new MetricArray();
              while (jobMetricRs.next()) {
                jobMetrics.add(resultSetToMetric(jobMetricRs));
              }
              // Add job metrics
              jobExecutionInfo.setMetrics(jobMetrics);
            }
          }
        }

        // Query job properties
        Set<String> requestedJobPropertyKeys = null;
        if (query.hasJobProperties()) {
          requestedJobPropertyKeys = new HashSet<>(Arrays.asList(query.getJobProperties().split(",")));
        }
        try (PreparedStatement jobPropertiesQueryStatement = connection
            .prepareStatement(JOB_PROPERTY_QUERY_STATEMENT_TEMPLATE)) {
          jobPropertiesQueryStatement.setString(1, jobExecutionInfo.getJobId());
          try (ResultSet jobPropertiesRs = jobPropertiesQueryStatement.executeQuery()) {
            Map<String, String> jobProperties = Maps.newHashMap();
            while (jobPropertiesRs.next()) {
              Map.Entry<String, String> property = resultSetToProperty(jobPropertiesRs);
              if (requestedJobPropertyKeys == null || requestedJobPropertyKeys.contains(property.getKey())) {
                jobProperties.put(property.getKey(), property.getValue());
              }
            }
            // Add job properties
            jobExecutionInfo.setJobProperties(new StringMap(jobProperties));
          }
        }

        // Query task execution information
        if (query.isIncludeTaskExecutions()) {
          TaskExecutionInfoArray taskExecutionInfos = new TaskExecutionInfoArray();
          String taskExecutionQuery = TASK_EXECUTION_QUERY_STATEMENT_TEMPLATE;
          // Add table filter if applicable
          if (tableFilter.isPresent()) {
            taskExecutionQuery += " AND " + tableFilter;
          }
          try (PreparedStatement taskExecutionQueryStatement = connection.prepareStatement(taskExecutionQuery)) {
            taskExecutionQueryStatement.setString(1, jobId);
            if (tableFilter.isPresent()) {
              tableFilter.addParameters(taskExecutionQueryStatement, 2);
            }
            try (ResultSet taskRs = taskExecutionQueryStatement.executeQuery()) {
              while (taskRs.next()) {
                TaskExecutionInfo taskExecutionInfo = resultSetToTaskExecutionInfo(taskRs);

                // Query task metrics for each task execution record
                if (query.isIncludeTaskMetrics()) {
                  try (PreparedStatement taskMetricQueryStatement = connection
                      .prepareStatement(TASK_METRIC_QUERY_STATEMENT_TEMPLATE)) {
                    taskMetricQueryStatement.setString(1, taskExecutionInfo.getTaskId());
                    try (ResultSet taskMetricRs = taskMetricQueryStatement.executeQuery()) {
                      MetricArray taskMetrics = new MetricArray();
                      while (taskMetricRs.next()) {
                        taskMetrics.add(resultSetToMetric(taskMetricRs));
                      }
                      // Add task metrics
                      taskExecutionInfo.setMetrics(taskMetrics);
                    }
                  }
                }

                taskExecutionInfos.add(taskExecutionInfo);

                // Query task properties
                Set<String> queryTaskPropertyKeys = null;
                if (query.hasTaskProperties()) {
                  queryTaskPropertyKeys = new HashSet<>(Arrays.asList(query.getTaskProperties().split(",")));
                }
                try (PreparedStatement taskPropertiesQueryStatement = connection
                    .prepareStatement(TASK_PROPERTY_QUERY_STATEMENT_TEMPLATE)) {
                  taskPropertiesQueryStatement.setString(1, taskExecutionInfo.getTaskId());
                  try (ResultSet taskPropertiesRs = taskPropertiesQueryStatement.executeQuery()) {
                    Map<String, String> taskProperties = Maps.newHashMap();
                    while (taskPropertiesRs.next()) {
                      Map.Entry<String, String> property = resultSetToProperty(taskPropertiesRs);
                      if (queryTaskPropertyKeys == null || queryTaskPropertyKeys.contains(property.getKey())) {
                        taskProperties.put(property.getKey(), property.getValue());
                      }
                    }
                    // Add job properties
                    taskExecutionInfo.setTaskProperties(new StringMap(taskProperties));
                  }
                }
                // Add task properties
              }
              // Add task execution information
              jobExecutionInfo.setTaskExecutions(taskExecutionInfos);
            }
          }
        }
        return jobExecutionInfo;
      }
    }
  }