public T deserialize()

in notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java [142:287]


    public T deserialize(String messageJson) {
        final T ret;

        messageCountTotal.incrementAndGet();
        messageCountSinceLastInterval.incrementAndGet();

        this.msgCreated = 0;
        this.spooled    = false;
        this.source     = null;

        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class);

        if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
            ret = AtlasType.fromV1Json(messageJson, messageType);
        } else {
            this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime();
            this.spooled    = ((AtlasNotificationMessage) msg).getSpooled();
            this.source     = msg.getSource() != null ? msg.getSource().getSource() : null;

            String msgJson = messageJson;

            if (msg.getMsgSplitCount() > 1) { // multi-part message
                AtlasNotificationStringMessage splitMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);

                checkVersion(splitMsg, msgJson);

                String msgId = splitMsg.getMsgId();

                if (StringUtils.isEmpty(msgId)) {
                    LOG.error("Received multi-part message with no message ID. Ignoring message");

                    msg = null;
                } else {
                    final int splitIdx   = splitMsg.getMsgSplitIdx();
                    final int splitCount = splitMsg.getMsgSplitCount();

                    final SplitMessageAggregator splitMsgs;

                    if (splitIdx == 0) {
                        splitMsgs = new SplitMessageAggregator(splitMsg);

                        splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs);
                    } else {
                        splitMsgs = splitMsgBuffer.get(msgId);
                    }

                    if (splitMsgs == null) {
                        LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);

                        msg = null;
                    } else if (splitMsgs.getTotalSplitCount() <= splitIdx) {
                        LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);

                        msg = null;
                    } else {
                        LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount);

                        boolean isReady = splitMsgs.add(splitMsg);

                        if (isReady) { // last message
                            splitMsgBuffer.remove(msgId);

                            boolean isValidMessage = true;

                            StringBuilder sb = new StringBuilder();

                            for (int i = 0; i < splitMsgs.getTotalSplitCount(); i++) {
                                splitMsg = splitMsgs.get(i);

                                if (splitMsg == null) {
                                    LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);

                                    isValidMessage = false;

                                    break;
                                }

                                sb.append(splitMsg.getMessage());
                            }

                            if (isValidMessage) {
                                msgJson = sb.toString();

                                if (CompressionKind.GZIP.equals(splitMsg.getMsgCompressionKind())) {
                                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
                                    byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);

                                    msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);

                                    LOG.info("Received msgID={}: splitCount={}, compressed={} bytes, uncompressed={} bytes", msgId, splitCount, encodedBytes.length, bytes.length);
                                } else {
                                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
                                    byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64(encodedBytes);

                                    msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);

                                    LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length);
                                }

                                msg = AtlasType.fromV1Json(msgJson, AtlasNotificationBaseMessage.class);
                            } else {
                                msg = null;
                            }
                        } else { // more messages to arrive
                            msg = null;
                        }
                    }
                }
            }

            if (msg != null) {
                if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
                    AtlasNotificationStringMessage compressedMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);

                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
                    byte[] bytes        = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);

                    msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);

                    LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
                }

                AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType);

                checkCrossCombatMessageVersion(atlasNotificationMessage);
                checkVersion(atlasNotificationMessage, msgJson);

                ret = atlasNotificationMessage.getMessage();
            } else {
                ret = null;
            }
        }

        long now                = System.currentTimeMillis();
        long timeSinceLastPurge = now - splitMessagesLastPurgeTime;

        if (timeSinceLastPurge >= splitMessageBufferPurgeIntervalMs) {
            purgeStaleMessages(splitMsgBuffer, now, splitMessageSegmentsWaitTimeMs);

            LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", messageCountTotal.get(), messageCountSinceLastInterval.getAndSet(0));

            splitMessagesLastPurgeTime = now;
        }

        return ret;
    }