in gobblin-metastore/src/main/java/org/apache/gobblin/metastore/database/DatabaseJobHistoryStoreV100.java [527:644]
private JobExecutionInfo processQueryById(Connection connection, String jobId, JobExecutionQuery query,
Filter tableFilter) throws SQLException {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobId));
// Query job execution information
try (PreparedStatement jobExecutionQueryStatement = connection
.prepareStatement(JOB_EXECUTION_QUERY_BY_JOB_ID_STATEMENT_TEMPLATE)) {
jobExecutionQueryStatement.setString(1, jobId);
try (ResultSet jobRs = jobExecutionQueryStatement.executeQuery()) {
if (!jobRs.next()) {
return null;
}
JobExecutionInfo jobExecutionInfo = resultSetToJobExecutionInfo(jobRs);
// Query job metrics
if (query.isIncludeJobMetrics()) {
try (PreparedStatement jobMetricQueryStatement = connection
.prepareStatement(JOB_METRIC_QUERY_STATEMENT_TEMPLATE)) {
jobMetricQueryStatement.setString(1, jobRs.getString(2));
try (ResultSet jobMetricRs = jobMetricQueryStatement.executeQuery()) {
MetricArray jobMetrics = new MetricArray();
while (jobMetricRs.next()) {
jobMetrics.add(resultSetToMetric(jobMetricRs));
}
// Add job metrics
jobExecutionInfo.setMetrics(jobMetrics);
}
}
}
// Query job properties
Set<String> requestedJobPropertyKeys = null;
if (query.hasJobProperties()) {
requestedJobPropertyKeys = new HashSet<>(Arrays.asList(query.getJobProperties().split(",")));
}
try (PreparedStatement jobPropertiesQueryStatement = connection
.prepareStatement(JOB_PROPERTY_QUERY_STATEMENT_TEMPLATE)) {
jobPropertiesQueryStatement.setString(1, jobExecutionInfo.getJobId());
try (ResultSet jobPropertiesRs = jobPropertiesQueryStatement.executeQuery()) {
Map<String, String> jobProperties = Maps.newHashMap();
while (jobPropertiesRs.next()) {
Map.Entry<String, String> property = resultSetToProperty(jobPropertiesRs);
if (requestedJobPropertyKeys == null || requestedJobPropertyKeys.contains(property.getKey())) {
jobProperties.put(property.getKey(), property.getValue());
}
}
// Add job properties
jobExecutionInfo.setJobProperties(new StringMap(jobProperties));
}
}
// Query task execution information
if (query.isIncludeTaskExecutions()) {
TaskExecutionInfoArray taskExecutionInfos = new TaskExecutionInfoArray();
String taskExecutionQuery = TASK_EXECUTION_QUERY_STATEMENT_TEMPLATE;
// Add table filter if applicable
if (tableFilter.isPresent()) {
taskExecutionQuery += " AND " + tableFilter;
}
try (PreparedStatement taskExecutionQueryStatement = connection.prepareStatement(taskExecutionQuery)) {
taskExecutionQueryStatement.setString(1, jobId);
if (tableFilter.isPresent()) {
tableFilter.addParameters(taskExecutionQueryStatement, 2);
}
try (ResultSet taskRs = taskExecutionQueryStatement.executeQuery()) {
while (taskRs.next()) {
TaskExecutionInfo taskExecutionInfo = resultSetToTaskExecutionInfo(taskRs);
// Query task metrics for each task execution record
if (query.isIncludeTaskMetrics()) {
try (PreparedStatement taskMetricQueryStatement = connection
.prepareStatement(TASK_METRIC_QUERY_STATEMENT_TEMPLATE)) {
taskMetricQueryStatement.setString(1, taskExecutionInfo.getTaskId());
try (ResultSet taskMetricRs = taskMetricQueryStatement.executeQuery()) {
MetricArray taskMetrics = new MetricArray();
while (taskMetricRs.next()) {
taskMetrics.add(resultSetToMetric(taskMetricRs));
}
// Add task metrics
taskExecutionInfo.setMetrics(taskMetrics);
}
}
}
taskExecutionInfos.add(taskExecutionInfo);
// Query task properties
Set<String> queryTaskPropertyKeys = null;
if (query.hasTaskProperties()) {
queryTaskPropertyKeys = new HashSet<>(Arrays.asList(query.getTaskProperties().split(",")));
}
try (PreparedStatement taskPropertiesQueryStatement = connection
.prepareStatement(TASK_PROPERTY_QUERY_STATEMENT_TEMPLATE)) {
taskPropertiesQueryStatement.setString(1, taskExecutionInfo.getTaskId());
try (ResultSet taskPropertiesRs = taskPropertiesQueryStatement.executeQuery()) {
Map<String, String> taskProperties = Maps.newHashMap();
while (taskPropertiesRs.next()) {
Map.Entry<String, String> property = resultSetToProperty(taskPropertiesRs);
if (queryTaskPropertyKeys == null || queryTaskPropertyKeys.contains(property.getKey())) {
taskProperties.put(property.getKey(), property.getValue());
}
}
// Add job properties
taskExecutionInfo.setTaskProperties(new StringMap(taskProperties));
}
}
// Add task properties
}
// Add task execution information
jobExecutionInfo.setTaskExecutions(taskExecutionInfos);
}
}
}
return jobExecutionInfo;
}
}
}