void handleMessage()

in webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java [1133:1436]


        void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
            AtlasPerfTracer  perf        = null;
            HookNotification message     = kafkaMsg.getMessage();
            String           messageUser = message.getUser();
            long             startTime   = System.currentTimeMillis();
            NotificationStat stats       = new NotificationStat();
            AuditLog         auditLog    = null;

            if (authorizeUsingMessageUser) {
                setCurrentUser(messageUser);
            }

            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
            }

            try {
                // covert V1 messages to V2 to enable preProcess
                try {
                    switch (message.getType()) {
                        case ENTITY_CREATE: {
                            final EntityCreateRequest      createRequest = (EntityCreateRequest) message;
                            final AtlasEntitiesWithExtInfo entities      = instanceConverter.toAtlasEntities(createRequest.getEntities());
                            final EntityCreateRequestV2    v2Request     = new EntityCreateRequestV2(message.getUser(), entities);

                            kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
                            message  = kafkaMsg.getMessage();
                        }
                        break;

                        case ENTITY_FULL_UPDATE: {
                            final EntityUpdateRequest      updateRequest = (EntityUpdateRequest) message;
                            final AtlasEntitiesWithExtInfo entities      = instanceConverter.toAtlasEntities(updateRequest.getEntities());
                            final EntityUpdateRequestV2    v2Request     = new EntityUpdateRequestV2(messageUser, entities);

                            kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
                            message  = kafkaMsg.getMessage();
                        }
                        break;
                    }
                } catch (AtlasBaseException excp) {
                    LOG.error("handleMessage({}): failed to convert V1 message to V2", message.getType().name());
                }

                PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);

                if (isEmptyMessage(kafkaMsg)) {
                    commit(kafkaMsg);

                    return;
                }

                // Used for intermediate conversions during create and update
                String exceptionClassName = StringUtils.EMPTY;
                for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
                    }

                    try {
                        RequestContext requestContext = RequestContext.get();

                        requestContext.setAttemptCount(numRetries + 1);
                        requestContext.setMaxAttempts(maxRetries);

                        requestContext.setUser(messageUser, null);
                        requestContext.setInNotificationProcessing(true);
                        requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);

                        switch (message.getType()) {
                            case ENTITY_CREATE: {
                                final EntityCreateRequest      createRequest = (EntityCreateRequest) message;
                                final AtlasEntitiesWithExtInfo entities      = instanceConverter.toAtlasEntities(createRequest.getEntities());

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClient.API_V1.CREATE_ENTITY.getMethod(), AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
                                }

                                createOrUpdate(entities, false, stats, context);
                            }
                            break;

                            case ENTITY_PARTIAL_UPDATE: {
                                final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
                                final Referenceable              referenceable        = partialUpdateRequest.getEntity();
                                final AtlasEntitiesWithExtInfo   entities             = instanceConverter.toAtlasEntity(referenceable);

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
                                            AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
                                            String.format(AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), partialUpdateRequest.getTypeName()));
                                }

                                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
                                String          guid       = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()));

                                // There should only be one root entity
                                entities.getEntities().get(0).setGuid(guid);

                                createOrUpdate(entities, true, stats, context);
                            }
                            break;

                            case ENTITY_DELETE: {
                                final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
                                            AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
                                            String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), deleteRequest.getTypeName()));
                                }

                                try {
                                    AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());

                                    EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()));

                                    stats.updateStats(response);

                                    entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
                                } catch (ClassCastException cle) {
                                    LOG.error("Failed to delete entity {}", deleteRequest);
                                }
                            }
                            break;

                            case ENTITY_FULL_UPDATE: {
                                final EntityUpdateRequest      updateRequest = (EntityUpdateRequest) message;
                                final AtlasEntitiesWithExtInfo entities      = instanceConverter.toAtlasEntities(updateRequest.getEntities());

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                }

                                createOrUpdate(entities, false, stats, context);
                            }
                            break;

                            case ENTITY_CREATE_V2: {
                                final EntityCreateRequestV2    createRequestV2 = (EntityCreateRequestV2) message;
                                final AtlasEntitiesWithExtInfo entities        = createRequestV2.getEntities();

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.CREATE_ENTITY.getMethod(), AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
                                }

                                createOrUpdate(entities, false, stats, context);
                            }
                            break;

                            case ENTITY_PARTIAL_UPDATE_V2: {
                                final EntityPartialUpdateRequestV2 partialUpdateRequest = (EntityPartialUpdateRequestV2) message;
                                final AtlasObjectId                entityId             = partialUpdateRequest.getEntityId();
                                final AtlasEntityWithExtInfo       entity               = partialUpdateRequest.getEntity();

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                }

                                EntityMutationResponse response = atlasEntityStore.updateEntity(entityId, entity, true);

                                stats.updateStats(response);
                            }
                            break;

                            case ENTITY_FULL_UPDATE_V2: {
                                final EntityUpdateRequestV2    updateRequest = (EntityUpdateRequestV2) message;
                                final AtlasEntitiesWithExtInfo entities      = updateRequest.getEntities();

                                if (auditLog == null) {
                                    auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                }

                                createOrUpdate(entities, false, stats, context);
                            }
                            break;

                            case ENTITY_DELETE_V2: {
                                final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message;
                                final List<AtlasObjectId>   entities      = deleteRequest.getEntities();

                                try {
                                    for (AtlasObjectId entity : entities) {
                                        if (auditLog == null) {
                                            auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
                                                    AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
                                                    String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), entity.getTypeName()));
                                        }

                                        AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());

                                        EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());

                                        stats.updateStats(response);

                                        entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
                                    }
                                } catch (ClassCastException cle) {
                                    LOG.error("Failed to do delete entities {}", entities);
                                }
                            }
                            break;

                            default:
                                throw new IllegalStateException("Unknown notification type: " + message.getType().name());
                        }

                        if (StringUtils.isNotEmpty(exceptionClassName)) {
                            LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. Handled!", exceptionClassName, numRetries, adaptiveWaiter.waitDuration);

                            exceptionClassName = StringUtils.EMPTY;
                        }
                        break;
                    } catch (Throwable e) {
                        RequestContext.get().resetEntityGuidUpdates();
                        exceptionClassName = e.getClass().getSimpleName();

                        // don't retry in following conditions:
                        //  1. number of retry attempts reached configured count
                        //  2. notification processing failed due to invalid data (non-existing type, entity, ..)
                        boolean        maxRetriesReached    = numRetries == (maxRetries - 1);
                        AtlasErrorCode errorCode            = (e instanceof AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null;
                        boolean        unrecoverableFailure = errorCode != null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) || Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode()));

                        if (maxRetriesReached || unrecoverableFailure) {
                            try {
                                String strMessage = AbstractNotification.getMessageJson(message);

                                if (unrecoverableFailure) {
                                    LOG.warn("Unrecoverable failure while processing message {}", strMessage, e);
                                } else {
                                    LOG.warn("Max retries exceeded for message {}", strMessage, e);
                                }

                                stats.isFailedMsg = true;

                                failedMessages.add(strMessage);

                                if (failedMessages.size() >= failedMsgCacheSize) {
                                    recordFailedMessages();
                                }
                            } catch (Throwable t) {
                                LOG.warn("error while recording failed message: type={}, topic={}, partition={}, offset={}", message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
                            }

                            return;
                        } else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
                            LOG.warn("{}: Continuing: {}", exceptionClassName, e.getMessage());
                        } else if (exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION) || exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
                            LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. {}", exceptionClassName, numRetries, adaptiveWaiter.waitDuration, e.getMessage());

                            adaptiveWaiter.pause(e);
                        } else {
                            LOG.warn("Error handling message", e);

                            try {
                                LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);

                                Thread.sleep(consumerRetryInterval);
                            } catch (InterruptedException ie) {
                                LOG.error("Notification consumer thread sleep interrupted");
                            }
                        }
                    } finally {
                        RequestContext.clear();
                    }
                }

                commit(kafkaMsg);
            } finally {
                AtlasPerfTracer.log(perf);

                stats.timeTakenMs = System.currentTimeMillis() - startTime;

                metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);

                if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
                    try {
                        String strMessage = AbstractNotification.getMessageJson(message);

                        LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());

                        LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
                    } catch (Throwable t) {
                        LOG.warn("error while recording large message: msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
                    }
                }

                if (auditLog != null) {
                    auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST : SC_OK);
                    auditLog.setTimeTaken(stats.timeTakenMs);

                    AuditFilter.audit(auditLog);
                }

                Instant now = Instant.now();

                if (now.isAfter(nextStatsLogTime)) {
                    LOG.info("STATS: {}", AtlasJson.toJson(metricsUtil.getStats()));

                    nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(now);
                }
            }
        }