in gobblin-metastore/src/main/java/org/apache/gobblin/metastore/database/DatabaseJobHistoryStoreV101.java [643:698]
private void addPropertiesToTasks(Connection connection, JobExecutionQuery query, Filter tableFilter,
Map<String, Map<String, TaskExecutionInfo>> taskExecutionInfos)
throws SQLException {
if (taskExecutionInfos.size() > 0) {
Set<String> propertyKeys = null;
if (query.hasTaskProperties()) {
propertyKeys = Sets.newHashSet(Iterables.filter(Arrays.asList(query.getTaskProperties().split(",")),
new Predicate<String>() {
@Override
public boolean apply(String input) {
return !Strings.isNullOrEmpty(input);
}
}));
}
if (propertyKeys == null || propertyKeys.size() > 0) {
String template = String.format(TASK_PROPERTY_QUERY_STATEMENT_TEMPLATE,
getInPredicate(taskExecutionInfos.size()));
if (propertyKeys != null && propertyKeys.size() > 0) {
template += String.format("AND property_key IN (%s)", getInPredicate(propertyKeys.size()));
}
if (tableFilter.isPresent()) {
template += " AND t." + tableFilter;
}
int index = 1;
try (PreparedStatement taskPropertiesQueryStatement = connection.prepareStatement(template)) {
for (String jobId : taskExecutionInfos.keySet()) {
taskPropertiesQueryStatement.setString(index++, jobId);
}
if (propertyKeys != null && propertyKeys.size() > 0) {
for (String propertyKey : propertyKeys) {
taskPropertiesQueryStatement.setString(index++, propertyKey);
}
}
if (tableFilter.isPresent()) {
tableFilter.addParameters(taskPropertiesQueryStatement, index);
}
try (ResultSet taskPropertiesRs = taskPropertiesQueryStatement.executeQuery()) {
while (taskPropertiesRs.next()) {
String jobId = taskPropertiesRs.getString("job_id");
String taskId = taskPropertiesRs.getString("task_id");
TaskExecutionInfo taskExecutionInfo = taskExecutionInfos.get(jobId).get(taskId);
StringMap taskProperties = taskExecutionInfo.getTaskProperties(GetMode.NULL);
if (taskProperties == null) {
taskProperties = new StringMap();
taskExecutionInfo.setTaskProperties(taskProperties);
}
Map.Entry<String, String> property = resultSetToProperty(taskPropertiesRs);
if (propertyKeys == null || propertyKeys.contains(property.getKey())) {
taskProperties.put(property.getKey(), property.getValue());
}
}
}
}
}
}
}