private void processMessage()

in inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java [263:393]


    private void processMessage(ChannelHandlerContext ctx, Map<String, String> reqAttrs,
            long msgRcvTime, String clientIp, boolean isCloseCon) throws Exception {
        StringBuilder strBuff = new StringBuilder(512);
        String callback = reqAttrs.get(HttpAttrConst.KEY_CALLBACK);
        String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID);
        if (StringUtils.isBlank(groupId)) {
            source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
            sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
                    strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID)
                            .append(" must exist and not blank!").toString(),
                    isCloseCon, callback);
            return;
        }
        // get and check streamId
        String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
        if (StringUtils.isBlank(streamId)) {
            source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_STREAMID_MISSING, groupId);
            sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
                    strBuff.append("Field ").append(HttpAttrConst.KEY_STREAM_ID)
                            .append(" must exist and not blank!").toString(),
                    isCloseCon, callback);
            return;
        }
        // get and check topicName
        String topicName = ConfigManager.getInstance().getTopicName(groupId, streamId);
        if (StringUtils.isEmpty(topicName)) {
            source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, groupId);
            sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
                    strBuff.append("Topic not configured for ").append(HttpAttrConst.KEY_GROUP_ID)
                            .append("(").append(groupId).append("),")
                            .append(HttpAttrConst.KEY_STREAM_ID)
                            .append("(,").append(streamId).append(")").toString(),
                    isCloseCon, callback);
            return;
        }
        // get and check dt
        long dataTime = msgRcvTime;
        String dt = reqAttrs.get(HttpAttrConst.KEY_DATA_TIME);
        if (StringUtils.isNotEmpty(dt)) {
            try {
                dataTime = Long.parseLong(dt);
            } catch (Throwable e) {
                //
            }
        }
        // get and check body
        String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
        if (StringUtils.isBlank(body)) {
            if (body == null) {
                source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_MISSING, groupId);
                sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
                        strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                .append(" is not exist!").toString(),
                        isCloseCon, callback);
            } else {
                source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_BLANK, groupId);
                sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
                        strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                .append(" is Blank!").toString(),
                        isCloseCon, callback);
            }
            return;
        }
        if (body.length() > source.getMaxMsgLength()) {
            source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_OVERMAX, groupId);
            sendResponse(ctx, DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
                    strBuff.append("Error msg, the ").append(HttpAttrConst.KEY_BODY)
                            .append(" length(").append(body.length())
                            .append(") is bigger than allowed length(")
                            .append(source.getMaxMsgLength()).append(")").toString(),
                    isCloseCon, callback);
            return;
        }
        // get message count
        int intMsgCnt = NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
        String strMsgCount = String.valueOf(intMsgCnt);
        // get audit version
        long auditVersion = AuditUtils.getAuditVersion(reqAttrs);
        // build message attributes
        InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
        strBuff.append("groupId=").append(groupId)
                .append("&streamId=").append(streamId)
                .append("&dt=").append(dataTime)
                .append("&clientIp=").append(clientIp)
                .append("&cnt=").append(strMsgCount)
                .append("&rt=").append(msgRcvTime)
                .append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
                .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
        if (auditVersion != -1L) {
            strBuff.append(AttributeConstants.SEPARATOR).append(AttributeConstants.AUDIT_VERSION)
                    .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(auditVersion);
        }
        inLongMsg.addMsg(strBuff.toString(), body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
        byte[] inlongMsgData = inLongMsg.buildArray();
        long pkgTime = inLongMsg.getCreatetime();
        inLongMsg.reset();
        strBuff.delete(0, strBuff.length());
        // build flume event
        Map<String, String> eventHeaders = new HashMap<>();
        eventHeaders.put(AttributeConstants.GROUP_ID, groupId);
        eventHeaders.put(AttributeConstants.STREAM_ID, streamId);
        eventHeaders.put(ConfigConstants.TOPIC_KEY, topicName);
        eventHeaders.put(AttributeConstants.DATA_TIME, String.valueOf(dataTime));
        eventHeaders.put(ConfigConstants.REMOTE_IP_KEY, clientIp);
        eventHeaders.put(ConfigConstants.DATAPROXY_IP_KEY, source.getSrcHost());
        eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
        eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
                MessageWrapType.INLONG_MSG_V0.getStrId());
        eventHeaders.put(EventConstants.HEADER_KEY_VERSION,
                MessageWrapType.INLONG_MSG_V0.getStrId());
        eventHeaders.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
        eventHeaders.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(pkgTime));
        eventHeaders.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditVersion));
        Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
        try {
            source.getCachedChProcessor().processEvent(event);
            source.fileMetricAddSuccStats(strBuff, groupId, streamId, topicName, clientIp,
                    "b2b", dataTime, pkgTime, intMsgCnt, 1, event.getBody().length);
            source.addMetric(true, event.getBody().length, event);
            sendSuccessResponse(ctx, isCloseCon, callback);
        } catch (Throwable ex) {
            source.fileMetricAddFailStats(strBuff, groupId, streamId, topicName, clientIp,
                    "b2b", dataTime, pkgTime, 1);
            source.addMetric(false, event.getBody().length, event);
            sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
                    strBuff.append("Put HTTP event to channel failure: ").append(ex.getMessage()).toString(), callback);
            if (logCounter.shouldPrint()) {
                logger.error("Error writing HTTP event to channel failure.", ex);
            }
        }
    }