private void addPropertiesToTasks()

in gobblin-metastore/src/main/java/org/apache/gobblin/metastore/database/DatabaseJobHistoryStoreV101.java [643:698]


  private void addPropertiesToTasks(Connection connection, JobExecutionQuery query, Filter tableFilter,
                                    Map<String, Map<String, TaskExecutionInfo>> taskExecutionInfos)
          throws SQLException {
    if (taskExecutionInfos.size() > 0) {
      Set<String> propertyKeys = null;
      if (query.hasTaskProperties()) {
          propertyKeys = Sets.newHashSet(Iterables.filter(Arrays.asList(query.getTaskProperties().split(",")),
                  new Predicate<String>() {
                      @Override
                      public boolean apply(String input) {
                          return !Strings.isNullOrEmpty(input);
                      }
                  }));
      }
      if (propertyKeys == null || propertyKeys.size() > 0) {
        String template = String.format(TASK_PROPERTY_QUERY_STATEMENT_TEMPLATE,
                getInPredicate(taskExecutionInfos.size()));
        if (propertyKeys != null && propertyKeys.size() > 0) {
          template += String.format("AND property_key IN (%s)", getInPredicate(propertyKeys.size()));
        }
        if (tableFilter.isPresent()) {
          template += " AND t." + tableFilter;
        }
        int index = 1;
        try (PreparedStatement taskPropertiesQueryStatement = connection.prepareStatement(template)) {
          for (String jobId : taskExecutionInfos.keySet()) {
            taskPropertiesQueryStatement.setString(index++, jobId);
          }
          if (propertyKeys != null && propertyKeys.size() > 0) {
            for (String propertyKey : propertyKeys) {
              taskPropertiesQueryStatement.setString(index++, propertyKey);
            }
          }
          if (tableFilter.isPresent()) {
            tableFilter.addParameters(taskPropertiesQueryStatement, index);
          }
          try (ResultSet taskPropertiesRs = taskPropertiesQueryStatement.executeQuery()) {
            while (taskPropertiesRs.next()) {
              String jobId = taskPropertiesRs.getString("job_id");
              String taskId = taskPropertiesRs.getString("task_id");
              TaskExecutionInfo taskExecutionInfo = taskExecutionInfos.get(jobId).get(taskId);
              StringMap taskProperties = taskExecutionInfo.getTaskProperties(GetMode.NULL);
              if (taskProperties == null) {
                taskProperties = new StringMap();
                taskExecutionInfo.setTaskProperties(taskProperties);
              }
              Map.Entry<String, String> property = resultSetToProperty(taskPropertiesRs);
              if (propertyKeys == null || propertyKeys.contains(property.getKey())) {
                taskProperties.put(property.getKey(), property.getValue());
              }
            }
          }
        }
      }
    }
  }