private long putEntities()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java [865:1155]


  private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates,
      TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity,
      TimelinePutResponse response) {

    long putCount = 0;
    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
        new ArrayList<EntityIdentifier>();
    byte[] revStartTime = null;
    Map<String, Set<Object>> primaryFilters = null;
    try {
      List<TimelineEvent> events = entity.getEvents();
      // look up the start time for the entity
      Long startTime = getAndSetStartTime(entity.getEntityId(),
          entity.getEntityType(), entity.getStartTime(), events);
      if (startTime == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.NO_START_TIME);
        response.addError(error);
        return putCount;
      }

      // Must have a domain
      if (StringUtils.isEmpty(entity.getDomainId())) {
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.NO_DOMAIN);
        response.addError(error);
        return putCount;
      }

      revStartTime = writeReverseOrderedLong(startTime);
      long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime);
      RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime);
      if (rollingWriteBatch == null) {
        DB db = entitydb.getDBForStartTime(startTime);
        if (db != null) {
          WriteBatch writeBatch = db.createWriteBatch();
          rollingWriteBatch = new RollingWriteBatch(db, writeBatch);
          entityUpdates.put(roundedStartTime, rollingWriteBatch);
        }
      }
      if (rollingWriteBatch == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
        response.addError(error);
        return putCount;
      }
      WriteBatch writeBatch = rollingWriteBatch.getWriteBatch();

      // Save off the getBytes conversion to avoid unnecessary cost
      byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8);
      byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8);
      byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8);

      // write entity marker
      byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true)
          .add(revStartTime).add(entityIdBytes, true).getBytesForLookup();
      writeBatch.put(markerKey, EMPTY_BYTES);
      ++putCount;

      // write domain id entry
      byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true)
          .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN)
          .getBytes();
      writeBatch.put(domainkey, domainIdBytes);
      ++putCount;

      // write event entries
      if (events != null) {
        for (TimelineEvent event : events) {
          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
          byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true)
              .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN)
              .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes();
          byte[] value = fstConf.asByteArray(event.getEventInfo());
          writeBatch.put(key, value);
          ++putCount;
        }
      }

      // write primary filter entries
      primaryFilters = entity.getPrimaryFilters();
      if (primaryFilters != null) {
        for (Entry<String, Set<Object>> primaryFilter : primaryFilters
            .entrySet()) {
          for (Object primaryFilterValue : primaryFilter.getValue()) {
            byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true)
                .add(revStartTime).add(entityIdBytes, true)
                .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey())
                .add(fstConf.asByteArray(primaryFilterValue)).getBytes();
            writeBatch.put(key, EMPTY_BYTES);
            ++putCount;
          }
        }
      }

      // write other info entries
      Map<String, Object> otherInfo = entity.getOtherInfo();
      if (otherInfo != null) {
        for (Entry<String, Object> info : otherInfo.entrySet()) {
          byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true)
              .add(revStartTime).add(entityIdBytes, true)
              .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes();
          byte[] value = fstConf.asByteArray(info.getValue());
          writeBatch.put(key, value);
          ++putCount;
        }
      }

      // write related entity entries
      Map<String, Set<String>> relatedEntities = entity.getRelatedEntities();
      if (relatedEntities != null) {
        for (Entry<String, Set<String>> relatedEntityList : relatedEntities
            .entrySet()) {
          String relatedEntityType = relatedEntityList.getKey();
          for (String relatedEntityId : relatedEntityList.getValue()) {
            // look up start time of related entity
            Long relatedStartTimeLong = getStartTimeLong(relatedEntityId,
                relatedEntityType);
            // delay writing the related entity if no start time is found
            if (relatedStartTimeLong == null) {
              relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(
                  relatedEntityId, relatedEntityType));
              continue;
            }

            byte[] relatedEntityStartTime =
                writeReverseOrderedLong(relatedStartTimeLong);
            long relatedRoundedStartTime = entitydb
                .computeCurrentCheckMillis(relatedStartTimeLong);
            RollingWriteBatch relatedRollingWriteBatch = entityUpdates
                .get(relatedRoundedStartTime);
            if (relatedRollingWriteBatch == null) {
              DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
              if (db != null) {
                WriteBatch relatedWriteBatch = db.createWriteBatch();
                relatedRollingWriteBatch = new RollingWriteBatch(db,
                    relatedWriteBatch);
                entityUpdates.put(relatedRoundedStartTime,
                    relatedRollingWriteBatch);
              }
            }
            if (relatedRollingWriteBatch == null) {
              // if no start time is found, add an error and return
              TimelinePutError error = new TimelinePutError();
              error.setEntityId(entity.getEntityId());
              error.setEntityType(entity.getEntityType());
              error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
              response.addError(error);
              continue;
            }
            // This is the existing entity
            byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get(
                createDomainIdKey(relatedEntityId, relatedEntityType,
                    relatedEntityStartTime));
            // The timeline data created by the server before 2.6 won't have
            // the domain field. We assume this timeline data is in the
            // default timeline domain.
            String domainId = null;
            if (relatedDomainIdBytes == null) {
              domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
            } else {
              domainId = new String(relatedDomainIdBytes, UTF_8);
            }
            if (!domainId.equals(entity.getDomainId())) {
              // in this case the entity will be put, but the relation will be
              // ignored
              TimelinePutError error = new TimelinePutError();
              error.setEntityId(entity.getEntityId());
              error.setEntityType(entity.getEntityType());
              error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
              response.addError(error);
              continue;
            }
            // write "forward" entry (related entity -> entity)
            byte[] key = createRelatedEntityKey(relatedEntityId,
                relatedEntityType, relatedEntityStartTime,
                entity.getEntityId(), entity.getEntityType());
            WriteBatch relatedWriteBatch = relatedRollingWriteBatch
                .getWriteBatch();
            relatedWriteBatch.put(key, EMPTY_BYTES);
            ++putCount;
          }
        }
      }

      // write index entities
      RollingWriteBatch indexRollingWriteBatch = indexUpdates
          .get(roundedStartTime);
      if (indexRollingWriteBatch == null) {
        DB db = indexdb.getDBForStartTime(startTime);
        if (db != null) {
          WriteBatch indexWriteBatch = db.createWriteBatch();
          indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch);
          indexUpdates.put(roundedStartTime, indexRollingWriteBatch);
        }
      }
      if (indexRollingWriteBatch == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
        response.addError(error);
        return putCount;
      }
      WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch();
      putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters,
          markerKey, EMPTY_BYTES);
    } catch (IOException e) {
      LOG.error("Error putting entity " + entity.getEntityId() + " of type "
          + entity.getEntityType(), e);
      TimelinePutError error = new TimelinePutError();
      error.setEntityId(entity.getEntityId());
      error.setEntityType(entity.getEntityType());
      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
      response.addError(error);
    }

    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
      try {
        Long relatedEntityStartAndInsertTime = getAndSetStartTime(
            relatedEntity.getId(), relatedEntity.getType(),
            readReverseOrderedLong(revStartTime, 0), null);
        if (relatedEntityStartAndInsertTime == null) {
          throw new IOException("Error setting start time for related entity");
        }
        long relatedStartTimeLong = relatedEntityStartAndInsertTime;
        long relatedRoundedStartTime = entitydb
            .computeCurrentCheckMillis(relatedStartTimeLong);
        RollingWriteBatch relatedRollingWriteBatch = entityUpdates
            .get(relatedRoundedStartTime);
        if (relatedRollingWriteBatch == null) {
          DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
          if (db != null) {
            WriteBatch relatedWriteBatch = db.createWriteBatch();
            relatedRollingWriteBatch = new RollingWriteBatch(db,
                relatedWriteBatch);
            entityUpdates
                .put(relatedRoundedStartTime, relatedRollingWriteBatch);
          }
        }
        if (relatedRollingWriteBatch == null) {
          // if no start time is found, add an error and return
          TimelinePutError error = new TimelinePutError();
          error.setEntityId(entity.getEntityId());
          error.setEntityType(entity.getEntityType());
          error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
          response.addError(error);
          continue;
        }
        WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch();
        byte[] relatedEntityStartTime =
            writeReverseOrderedLong(relatedEntityStartAndInsertTime);
        // This is the new entity, the domain should be the same
        byte[] key = createDomainIdKey(relatedEntity.getId(),
            relatedEntity.getType(), relatedEntityStartTime);
        relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8));
        ++putCount;
        relatedWriteBatch.put(
            createRelatedEntityKey(relatedEntity.getId(),
                relatedEntity.getType(), relatedEntityStartTime,
                entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
        ++putCount;
        relatedWriteBatch.put(
            createEntityMarkerKey(relatedEntity.getId(),
                relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
        ++putCount;
      } catch (IOException e) {
        LOG.error(
            "Error putting related entity " + relatedEntity.getId()
                + " of type " + relatedEntity.getType() + " for entity "
                + entity.getEntityId() + " of type " + entity.getEntityType(),
            e);
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
        response.addError(error);
      }
    }

    return putCount;
  }