in webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java [1133:1436]
void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
AtlasPerfTracer perf = null;
HookNotification message = kafkaMsg.getMessage();
String messageUser = message.getUser();
long startTime = System.currentTimeMillis();
NotificationStat stats = new NotificationStat();
AuditLog auditLog = null;
if (authorizeUsingMessageUser) {
setCurrentUser(messageUser);
}
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
}
try {
// covert V1 messages to V2 to enable preProcess
try {
switch (message.getType()) {
case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
final EntityCreateRequestV2 v2Request = new EntityCreateRequestV2(message.getUser(), entities);
kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
message = kafkaMsg.getMessage();
}
break;
case ENTITY_FULL_UPDATE: {
final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
final EntityUpdateRequestV2 v2Request = new EntityUpdateRequestV2(messageUser, entities);
kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
message = kafkaMsg.getMessage();
}
break;
}
} catch (AtlasBaseException excp) {
LOG.error("handleMessage({}): failed to convert V1 message to V2", message.getType().name());
}
PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);
if (isEmptyMessage(kafkaMsg)) {
commit(kafkaMsg);
return;
}
// Used for intermediate conversions during create and update
String exceptionClassName = StringUtils.EMPTY;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
RequestContext requestContext = RequestContext.get();
requestContext.setAttemptCount(numRetries + 1);
requestContext.setMaxAttempts(maxRetries);
requestContext.setUser(messageUser, null);
requestContext.setInNotificationProcessing(true);
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
switch (message.getType()) {
case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClient.API_V1.CREATE_ENTITY.getMethod(), AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_PARTIAL_UPDATE: {
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
final Referenceable referenceable = partialUpdateRequest.getEntity();
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntity(referenceable);
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), partialUpdateRequest.getTypeName()));
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()));
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
createOrUpdate(entities, true, stats, context);
}
break;
case ENTITY_DELETE: {
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()));
stats.updateStats(response);
entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
} catch (ClassCastException cle) {
LOG.error("Failed to delete entity {}", deleteRequest);
}
}
break;
case ENTITY_FULL_UPDATE: {
final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_CREATE_V2: {
final EntityCreateRequestV2 createRequestV2 = (EntityCreateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = createRequestV2.getEntities();
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.CREATE_ENTITY.getMethod(), AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_PARTIAL_UPDATE_V2: {
final EntityPartialUpdateRequestV2 partialUpdateRequest = (EntityPartialUpdateRequestV2) message;
final AtlasObjectId entityId = partialUpdateRequest.getEntityId();
final AtlasEntityWithExtInfo entity = partialUpdateRequest.getEntity();
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
EntityMutationResponse response = atlasEntityStore.updateEntity(entityId, entity, true);
stats.updateStats(response);
}
break;
case ENTITY_FULL_UPDATE_V2: {
final EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = updateRequest.getEntities();
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_DELETE_V2: {
final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message;
final List<AtlasObjectId> entities = deleteRequest.getEntities();
try {
for (AtlasObjectId entity : entities) {
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), entity.getTypeName()));
}
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
stats.updateStats(response);
entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
}
} catch (ClassCastException cle) {
LOG.error("Failed to do delete entities {}", entities);
}
}
break;
default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
if (StringUtils.isNotEmpty(exceptionClassName)) {
LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. Handled!", exceptionClassName, numRetries, adaptiveWaiter.waitDuration);
exceptionClassName = StringUtils.EMPTY;
}
break;
} catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates();
exceptionClassName = e.getClass().getSimpleName();
// don't retry in following conditions:
// 1. number of retry attempts reached configured count
// 2. notification processing failed due to invalid data (non-existing type, entity, ..)
boolean maxRetriesReached = numRetries == (maxRetries - 1);
AtlasErrorCode errorCode = (e instanceof AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null;
boolean unrecoverableFailure = errorCode != null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) || Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode()));
if (maxRetriesReached || unrecoverableFailure) {
try {
String strMessage = AbstractNotification.getMessageJson(message);
if (unrecoverableFailure) {
LOG.warn("Unrecoverable failure while processing message {}", strMessage, e);
} else {
LOG.warn("Max retries exceeded for message {}", strMessage, e);
}
stats.isFailedMsg = true;
failedMessages.add(strMessage);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
} catch (Throwable t) {
LOG.warn("error while recording failed message: type={}, topic={}, partition={}, offset={}", message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
}
return;
} else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
LOG.warn("{}: Continuing: {}", exceptionClassName, e.getMessage());
} else if (exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION) || exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. {}", exceptionClassName, numRetries, adaptiveWaiter.waitDuration, e.getMessage());
adaptiveWaiter.pause(e);
} else {
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
}
} finally {
RequestContext.clear();
}
}
commit(kafkaMsg);
} finally {
AtlasPerfTracer.log(perf);
stats.timeTakenMs = System.currentTimeMillis() - startTime;
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
try {
String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
} catch (Throwable t) {
LOG.warn("error while recording large message: msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
}
}
if (auditLog != null) {
auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST : SC_OK);
auditLog.setTimeTaken(stats.timeTakenMs);
AuditFilter.audit(auditLog);
}
Instant now = Instant.now();
if (now.isAfter(nextStatsLogTime)) {
LOG.info("STATS: {}", AtlasJson.toJson(metricsUtil.getStats()));
nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(now);
}
}
}