in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java [602:743]
private TimelineEntities getEntityByTime(byte[] base,
String entityType, Long limit, Long starttime, Long endtime,
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
// Even if other info and primary filter fields are not included, we
// still need to load them to match secondary filters when they are
// non-empty
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
boolean addPrimaryFilters = false;
boolean addOtherInfo = false;
if (secondaryFilters != null && secondaryFilters.size() > 0) {
if (!fields.contains(Field.PRIMARY_FILTERS)) {
fields.add(Field.PRIMARY_FILTERS);
addPrimaryFilters = true;
}
if (!fields.contains(Field.OTHER_INFO)) {
fields.add(Field.OTHER_INFO);
addOtherInfo = true;
}
}
LeveldbIterator iterator = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
// only db keys matching the prefix (base + entity type) will be parsed
byte[] prefix = kb.getBytesForLookup();
if (endtime == null) {
// if end time is null, place no restriction on end time
endtime = Long.MAX_VALUE;
}
// construct a first key that will be seeked to using end time or fromId
byte[] first = null;
if (fromId != null) {
Long fromIdStartTime = getStartTimeLong(fromId, entityType);
if (fromIdStartTime == null) {
// no start time for provided id, so return empty entities
return new TimelineEntities();
}
if (fromIdStartTime <= endtime) {
// if provided id's start time falls before the end of the window,
// use it to construct the seek key
first = kb.add(writeReverseOrderedLong(fromIdStartTime))
.add(fromId).getBytesForLookup();
}
}
// if seek key wasn't constructed using fromId, construct it using end ts
if (first == null) {
first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
}
byte[] last = null;
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
}
TimelineEntities entities = new TimelineEntities();
iterator = new LeveldbIterator(db);
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
// or a start time has been specified and reached/exceeded
while (entities.getEntities().size() < limit && iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0)) {
break;
}
// read the start time and entity id from the current key
KeyParser kp = new KeyParser(key, prefix.length);
Long startTime = kp.getNextLong();
String entityId = kp.getNextString();
if (fromTs != null) {
long insertTime = readReverseOrderedLong(iterator.peekNext()
.getValue(), 0);
if (insertTime > fromTs) {
byte[] firstKey = key;
while (iterator.hasNext() && prefixMatches(firstKey,
kp.getOffset(), key)) {
iterator.next();
key = iterator.peekNext().getKey();
}
continue;
}
}
// parse the entity that owns this key, iterating over all keys for
// the entity
TimelineEntity entity = getEntity(entityId, entityType, startTime,
fields, iterator, key, kp.getOffset());
// determine if the retrieved entity matches the provided secondary
// filters, and if so add it to the list of entities to return
boolean filterPassed = true;
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = entity.getOtherInfo().get(filter.getName());
if (v == null) {
Set<Object> vs = entity.getPrimaryFilters()
.get(filter.getName());
if (vs == null || !vs.contains(filter.getValue())) {
filterPassed = false;
break;
}
} else if (!v.equals(filter.getValue())) {
filterPassed = false;
break;
}
}
}
if (filterPassed) {
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
// Remove primary filter and other info if they are added for
// matching secondary filters
if (addPrimaryFilters) {
entity.setPrimaryFilters(null);
}
if (addOtherInfo) {
entity.setOtherInfo(null);
}
entities.addEntity(entity);
}
}
}
return entities;
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
}