in webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java [238:346]
void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
HookNotificationMessage message = kafkaMsg.getMessage();
String messageUser = message.getUser();
// Used for intermediate conversions during create and update
AtlasEntity.AtlasEntitiesWithExtInfo entities;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(messageUser);
switch (message.getType()) {
case ENTITY_CREATE:
EntityCreateRequest createRequest = (EntityCreateRequest) message;
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
}
entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
case ENTITY_PARTIAL_UPDATE:
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
{
put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
}
});
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break;
case ENTITY_DELETE:
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type,
new HashMap<String, Object>() {{
put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
}});
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
break;
case ENTITY_FULL_UPDATE:
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
}
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
break;
} catch (Throwable e) {
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e);
failedMessages.add(message);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
return;
}
} finally {
RequestContext.clear();
RequestContextV1.clear();
}
}
commit(kafkaMsg);
}