private TimelineEntities getEntityByTime()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java [602:743]


  private TimelineEntities getEntityByTime(byte[] base,
      String entityType, Long limit, Long starttime, Long endtime,
      String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
      EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
    // Even if other info and primary filter fields are not included, we
    // still need to load them to match secondary filters when they are
    // non-empty
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }
    boolean addPrimaryFilters = false;
    boolean addOtherInfo = false;
    if (secondaryFilters != null && secondaryFilters.size() > 0) {
      if (!fields.contains(Field.PRIMARY_FILTERS)) {
        fields.add(Field.PRIMARY_FILTERS);
        addPrimaryFilters = true;
      }
      if (!fields.contains(Field.OTHER_INFO)) {
        fields.add(Field.OTHER_INFO);
        addOtherInfo = true;
      }
    }

    LeveldbIterator iterator = null;
    try {
      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
      // only db keys matching the prefix (base + entity type) will be parsed
      byte[] prefix = kb.getBytesForLookup();
      if (endtime == null) {
        // if end time is null, place no restriction on end time
        endtime = Long.MAX_VALUE;
      }
      // construct a first key that will be seeked to using end time or fromId
      byte[] first = null;
      if (fromId != null) {
        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
        if (fromIdStartTime == null) {
          // no start time for provided id, so return empty entities
          return new TimelineEntities();
        }
        if (fromIdStartTime <= endtime) {
          // if provided id's start time falls before the end of the window,
          // use it to construct the seek key
          first = kb.add(writeReverseOrderedLong(fromIdStartTime))
              .add(fromId).getBytesForLookup();
        }
      }
      // if seek key wasn't constructed using fromId, construct it using end ts
      if (first == null) {
        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
      }
      byte[] last = null;
      if (starttime != null) {
        // if start time is not null, set a last key that will not be
        // iterated past
        last = KeyBuilder.newInstance().add(base).add(entityType)
            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
      }
      if (limit == null) {
        // if limit is not specified, use the default
        limit = DEFAULT_LIMIT;
      }

      TimelineEntities entities = new TimelineEntities();
      iterator = new LeveldbIterator(db);
      iterator.seek(first);
      // iterate until one of the following conditions is met: limit is
      // reached, there are no more keys, the key prefix no longer matches,
      // or a start time has been specified and reached/exceeded
      while (entities.getEntities().size() < limit && iterator.hasNext()) {
        byte[] key = iterator.peekNext().getKey();
        if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
            WritableComparator.compareBytes(key, 0, key.length, last, 0,
                last.length) > 0)) {
          break;
        }
        // read the start time and entity id from the current key
        KeyParser kp = new KeyParser(key, prefix.length);
        Long startTime = kp.getNextLong();
        String entityId = kp.getNextString();

        if (fromTs != null) {
          long insertTime = readReverseOrderedLong(iterator.peekNext()
              .getValue(), 0);
          if (insertTime > fromTs) {
            byte[] firstKey = key;
            while (iterator.hasNext() && prefixMatches(firstKey,
                kp.getOffset(), key)) {
              iterator.next();
              key = iterator.peekNext().getKey();
            }
            continue;
          }
        }

        // parse the entity that owns this key, iterating over all keys for
        // the entity
        TimelineEntity entity = getEntity(entityId, entityType, startTime,
            fields, iterator, key, kp.getOffset());
        // determine if the retrieved entity matches the provided secondary
        // filters, and if so add it to the list of entities to return
        boolean filterPassed = true;
        if (secondaryFilters != null) {
          for (NameValuePair filter : secondaryFilters) {
            Object v = entity.getOtherInfo().get(filter.getName());
            if (v == null) {
              Set<Object> vs = entity.getPrimaryFilters()
                  .get(filter.getName());
              if (vs == null || !vs.contains(filter.getValue())) {
                filterPassed = false;
                break;
              }
            } else if (!v.equals(filter.getValue())) {
              filterPassed = false;
              break;
            }
          }
        }
        if (filterPassed) {
          if (entity.getDomainId() == null) {
            entity.setDomainId(DEFAULT_DOMAIN_ID);
          }
          if (checkAcl == null || checkAcl.check(entity)) {
            // Remove primary filter and other info if they are added for
            // matching secondary filters
            if (addPrimaryFilters) {
              entity.setPrimaryFilters(null);
            }
            if (addOtherInfo) {
              entity.setOtherInfo(null);
            }
            entities.addEntity(entity);
          }
        }
      }
      return entities;
    } catch(DBException e) {
      throw new IOException(e);   	
    } finally {
      IOUtils.cleanupWithLogger(LOG, iterator);
    }
  }