in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java [760:954]
private void put(TimelineEntity entity, TimelinePutResponse response,
boolean allowEmptyDomainId) {
LockMap.CountingReentrantLock<EntityIdentifier> lock =
writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
entity.getEntityType()));
lock.lock();
WriteBatch writeBatch = null;
List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
new ArrayList<EntityIdentifier>();
byte[] revStartTime = null;
Map<String, Set<Object>> primaryFilters = null;
try {
writeBatch = db.createWriteBatch();
List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity
StartAndInsertTime startAndInsertTime = getAndSetStartTime(
entity.getEntityId(), entity.getEntityType(),
entity.getStartTime(), events);
if (startAndInsertTime == null) {
// if no start time is found, add an error and return
handleError(entity, response, TimelinePutError.NO_START_TIME);
return;
}
revStartTime = writeReverseOrderedLong(startAndInsertTime
.startTime);
primaryFilters = entity.getPrimaryFilters();
// write entity marker
byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
entity.getEntityType(), revStartTime);
byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
.insertTime);
writeBatch.put(markerKey, markerValue);
writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
markerValue);
// write event entries
if (events != null && !events.isEmpty()) {
for (TimelineEvent event : events) {
byte[] revts = writeReverseOrderedLong(event.getTimestamp());
byte[] key = createEntityEventKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, revts,
event.getEventType());
byte[] value = GenericObjectMapper.write(event.getEventInfo());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write related entity entries
Map<String, Set<String>> relatedEntities =
entity.getRelatedEntities();
if (relatedEntities != null && !relatedEntities.isEmpty()) {
for (Entry<String, Set<String>> relatedEntityList :
relatedEntities.entrySet()) {
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// invisible "reverse" entries (entity -> related entity)
byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, relatedEntityId,
relatedEntityType);
writeBatch.put(key, EMPTY_BYTES);
// look up start time of related entity
byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
relatedEntityType);
// delay writing the related entity if no start time is found
if (relatedEntityStartTime == null) {
relatedEntitiesWithoutStartTimes.add(
new EntityIdentifier(relatedEntityId, relatedEntityType));
continue;
} else {
// This is the existing entity
byte[] domainIdBytes = db.get(createDomainIdKey(
relatedEntityId, relatedEntityType, relatedEntityStartTime));
// The timeline data created by the server before 2.6 won't have
// the domain field. We assume this timeline data is in the
// default timeline domain.
String domainId = null;
if (domainIdBytes == null) {
domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
} else {
domainId = new String(domainIdBytes, StandardCharsets.UTF_8);
}
if (!domainId.equals(entity.getDomainId())) {
// in this case the entity will be put, but the relation will be
// ignored
handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
continue;
}
}
// write "forward" entry (related entity -> entity)
key = createRelatedEntityKey(relatedEntityId,
relatedEntityType, relatedEntityStartTime,
entity.getEntityId(), entity.getEntityType());
writeBatch.put(key, EMPTY_BYTES);
}
}
}
// write primary filter entries
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Set<Object>> primaryFilter :
primaryFilters.entrySet()) {
for (Object primaryFilterValue : primaryFilter.getValue()) {
byte[] key = createPrimaryFilterKey(entity.getEntityId(),
entity.getEntityType(), revStartTime,
primaryFilter.getKey(), primaryFilterValue);
writeBatch.put(key, EMPTY_BYTES);
writePrimaryFilterEntries(writeBatch, primaryFilters, key,
EMPTY_BYTES);
}
}
}
// write other info entries
Map<String, Object> otherInfo = entity.getOtherInfo();
if (otherInfo != null && !otherInfo.isEmpty()) {
for (Entry<String, Object> i : otherInfo.entrySet()) {
byte[] key = createOtherInfoKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, i.getKey());
byte[] value = GenericObjectMapper.write(i.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write domain id entry
byte[] key = createDomainIdKey(entity.getEntityId(),
entity.getEntityType(), revStartTime);
if (entity.getDomainId() == null ||
entity.getDomainId().length() == 0) {
if (!allowEmptyDomainId) {
handleError(entity, response, TimelinePutError.NO_DOMAIN);
return;
}
} else {
writeBatch.put(key, entity.getDomainId().getBytes(StandardCharsets.UTF_8));
writePrimaryFilterEntries(writeBatch, primaryFilters, key,
entity.getDomainId().getBytes(StandardCharsets.UTF_8));
}
db.write(writeBatch);
} catch (DBException de) {
LOG.error("Error putting entity " + entity.getEntityId() +
" of type " + entity.getEntityType(), de);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} catch (IOException e) {
LOG.error("Error putting entity " + entity.getEntityId() +
" of type " + entity.getEntityType(), e);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
IOUtils.cleanupWithLogger(LOG, writeBatch);
}
for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
lock = writeLocks.getLock(relatedEntity);
lock.lock();
try {
StartAndInsertTime relatedEntityStartAndInsertTime =
getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
readReverseOrderedLong(revStartTime, 0), null);
if (relatedEntityStartAndInsertTime == null) {
throw new IOException("Error setting start time for related entity");
}
byte[] relatedEntityStartTime = writeReverseOrderedLong(
relatedEntityStartAndInsertTime.startTime);
// This is the new entity, the domain should be the same
byte[] key = createDomainIdKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime);
db.put(key, entity.getDomainId().getBytes(StandardCharsets.UTF_8));
db.put(createRelatedEntityKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime,
entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
db.put(createEntityMarkerKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime),
writeReverseOrderedLong(relatedEntityStartAndInsertTime
.insertTime));
} catch (DBException de) {
LOG.error("Error putting related entity " + relatedEntity.getId() +
" of type " + relatedEntity.getType() + " for entity " +
entity.getEntityId() + " of type " + entity.getEntityType(), de);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} catch (IOException e) {
LOG.error("Error putting related entity " + relatedEntity.getId() +
" of type " + relatedEntity.getType() + " for entity " +
entity.getEntityId() + " of type " + entity.getEntityType(), e);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
}
}
}