private TimelineEntities getEntityByTime()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java [688:852]


  private TimelineEntities getEntityByTime(byte[] base, String entityType,
      Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
      CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
    KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
    // only db keys matching the prefix (base + entity type) will be parsed
    byte[] prefix = kb.getBytesForLookup();
    if (endtime == null) {
      // if end time is null, place no restriction on end time
      endtime = Long.MAX_VALUE;
    }

    // Sanitize the fields parameter
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }

    // construct a first key that will be seeked to using end time or fromId
    long firstStartTime = Long.MAX_VALUE;
    byte[] first = null;
    if (fromId != null) {
      Long fromIdStartTime = getStartTimeLong(fromId, entityType);
      if (fromIdStartTime == null) {
        // no start time for provided id, so return empty entities
        return new TimelineEntities();
      }
      if (fromIdStartTime <= endtime) {
        // if provided id's start time falls before the end of the window,
        // use it to construct the seek key
        firstStartTime = fromIdStartTime;
        first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
            .getBytesForLookup();
      }
    }
    // if seek key wasn't constructed using fromId, construct it using end ts
    if (first == null) {
      firstStartTime = endtime;
      first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
    }
    byte[] last = null;
    if (starttime != null) {
      // if start time is not null, set a last key that will not be
      // iterated past
      last = KeyBuilder.newInstance().add(base).add(entityType)
          .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
    }
    if (limit == null) {
      // if limit is not specified, use the default
      limit = DEFAULT_LIMIT;
    }

    TimelineEntities entities = new TimelineEntities();
    RollingLevelDB rollingdb = null;
    if (usingPrimaryFilter) {
      rollingdb = indexdb;
    } else {
      rollingdb = entitydb;
    }

    DB db = rollingdb.getDBForStartTime(firstStartTime);
    while (entities.getEntities().size() < limit && db != null) {
      try (DBIterator iterator = db.iterator()) {
        iterator.seek(first);

        // iterate until one of the following conditions is met: limit is
        // reached, there are no more keys, the key prefix no longer matches,
        // or a start time has been specified and reached/exceeded
        while (entities.getEntities().size() < limit && iterator.hasNext()) {
          byte[] key = iterator.peekNext().getKey();
          if (!prefixMatches(prefix, prefix.length, key)
              || (last != null && WritableComparator.compareBytes(key, 0,
              key.length, last, 0, last.length) > 0)) {
            break;
          }
          // read the start time and entity id from the current key
          KeyParser kp = new KeyParser(key, prefix.length);
          Long startTime = kp.getNextLong();
          String entityId = kp.getNextString();

          if (fromTs != null) {
            long insertTime = readReverseOrderedLong(iterator.peekNext()
                .getValue(), 0);
            if (insertTime > fromTs) {
              byte[] firstKey = key;
              while (iterator.hasNext()) {
                key = iterator.peekNext().getKey();
                iterator.next();
                if (!prefixMatches(firstKey, kp.getOffset(), key)) {
                  break;
                }
              }
              continue;
            }
          }
          // Even if other info and primary filter fields are not included, we
          // still need to load them to match secondary filters when they are
          // non-empty
          EnumSet<Field> queryFields = EnumSet.copyOf(fields);
          boolean addPrimaryFilters = false;
          boolean addOtherInfo = false;
          if (secondaryFilters != null && secondaryFilters.size() > 0) {
            if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
              queryFields.add(Field.PRIMARY_FILTERS);
              addPrimaryFilters = true;
            }
            if (!queryFields.contains(Field.OTHER_INFO)) {
              queryFields.add(Field.OTHER_INFO);
              addOtherInfo = true;
            }
          }

          // parse the entity that owns this key, iterating over all keys for
          // the entity
          TimelineEntity entity = null;
          if (usingPrimaryFilter) {
            entity = getEntity(entityId, entityType, queryFields);
            iterator.next();
          } else {
            entity = getEntity(entityId, entityType, startTime, queryFields,
                iterator, key, kp.getOffset());
          }

          if (entity != null) {
            // determine if the retrieved entity matches the provided secondary
            // filters, and if so add it to the list of entities to return
            boolean filterPassed = true;
            if (secondaryFilters != null) {
              for (NameValuePair filter : secondaryFilters) {
                Object v = entity.getOtherInfo().get(filter.getName());
                if (v == null) {
                  Set<Object> vs = entity.getPrimaryFilters()
                          .get(filter.getName());
                  if (vs == null || !vs.contains(filter.getValue())) {
                    filterPassed = false;
                    break;
                  }
                } else if (!v.equals(filter.getValue())) {
                  filterPassed = false;
                  break;
                }
              }
            }
            if (filterPassed) {
              if (entity.getDomainId() == null) {
                entity.setDomainId(DEFAULT_DOMAIN_ID);
              }
              if (checkAcl == null || checkAcl.check(entity)) {
                // Remove primary filter and other info if they are added for
                // matching secondary filters
                if (addPrimaryFilters) {
                  entity.setPrimaryFilters(null);
                }
                if (addOtherInfo) {
                  entity.setOtherInfo(null);
                }
                entities.addEntity(entity);
              }
            }
          }
        }
        db = rollingdb.getPreviousDB(db);
      }
    }
    return entities;
  }