void handleMessage()

in webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java [238:346]


        void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
            HookNotificationMessage message = kafkaMsg.getMessage();
            String messageUser = message.getUser();
            // Used for intermediate conversions during create and update
            AtlasEntity.AtlasEntitiesWithExtInfo entities;
            for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
                }
                try {
                    RequestContext requestContext = RequestContext.createContext();
                    requestContext.setUser(messageUser);

                    switch (message.getType()) {
                        case ENTITY_CREATE:
                            EntityCreateRequest createRequest = (EntityCreateRequest) message;

                            if (numRetries == 0) { // audit only on the first attempt
                                audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
                            }

                            entities = instanceConverter.toAtlasEntities(createRequest.getEntities());

                            atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                            break;

                        case ENTITY_PARTIAL_UPDATE:
                            final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;

                            if (numRetries == 0) { // audit only on the first attempt
                                audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
                                        String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
                            }

                            Referenceable referenceable = partialUpdateRequest.getEntity();
                            entities = instanceConverter.toAtlasEntity(referenceable);

                            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
                            String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
                                {
                                    put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
                                }
                            });

                            // There should only be one root entity
                            entities.getEntities().get(0).setGuid(guid);

                            atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
                            break;

                        case ENTITY_DELETE:
                            final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;

                            if (numRetries == 0) { // audit only on the first attempt
                                audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
                                        String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
                            }

                            try {
                                AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
                                atlasEntityStore.deleteByUniqueAttributes(type,
                                        new HashMap<String, Object>() {{
                                            put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
                                        }});
                            } catch (ClassCastException cle) {
                                LOG.error("Failed to do a partial update on Entity");
                            }
                            break;

                        case ENTITY_FULL_UPDATE:
                            EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;

                            if (numRetries == 0) { // audit only on the first attempt
                                audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
                            }

                            entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
                            atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                            break;

                        default:
                            throw new IllegalStateException("Unknown notification type: " + message.getType().name());
                    }

                    break;
                } catch (Throwable e) {
                    LOG.warn("Error handling message", e);
                    try {
                        LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
                        Thread.sleep(consumerRetryInterval);
                    } catch (InterruptedException ie) {
                        LOG.error("Notification consumer thread sleep interrupted");
                    }

                    if (numRetries == (maxRetries - 1)) {
                        LOG.warn("Max retries exceeded for message {}", message, e);
                        failedMessages.add(message);
                        if (failedMessages.size() >= failedMsgCacheSize) {
                            recordFailedMessages();
                        }
                        return;
                    }
                } finally {
                    RequestContext.clear();
                    RequestContextV1.clear();
                }
            }
            commit(kafkaMsg);
        }