in inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java [263:393]
private void processMessage(ChannelHandlerContext ctx, Map<String, String> reqAttrs,
long msgRcvTime, String clientIp, boolean isCloseCon) throws Exception {
StringBuilder strBuff = new StringBuilder(512);
String callback = reqAttrs.get(HttpAttrConst.KEY_CALLBACK);
String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID);
if (StringUtils.isBlank(groupId)) {
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID)
.append(" must exist and not blank!").toString(),
isCloseCon, callback);
return;
}
// get and check streamId
String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
if (StringUtils.isBlank(streamId)) {
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_STREAMID_MISSING, groupId);
sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_STREAM_ID)
.append(" must exist and not blank!").toString(),
isCloseCon, callback);
return;
}
// get and check topicName
String topicName = ConfigManager.getInstance().getTopicName(groupId, streamId);
if (StringUtils.isEmpty(topicName)) {
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, groupId);
sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
strBuff.append("Topic not configured for ").append(HttpAttrConst.KEY_GROUP_ID)
.append("(").append(groupId).append("),")
.append(HttpAttrConst.KEY_STREAM_ID)
.append("(,").append(streamId).append(")").toString(),
isCloseCon, callback);
return;
}
// get and check dt
long dataTime = msgRcvTime;
String dt = reqAttrs.get(HttpAttrConst.KEY_DATA_TIME);
if (StringUtils.isNotEmpty(dt)) {
try {
dataTime = Long.parseLong(dt);
} catch (Throwable e) {
//
}
}
// get and check body
String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
if (StringUtils.isBlank(body)) {
if (body == null) {
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_MISSING, groupId);
sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is not exist!").toString(),
isCloseCon, callback);
} else {
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_BLANK, groupId);
sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is Blank!").toString(),
isCloseCon, callback);
}
return;
}
if (body.length() > source.getMaxMsgLength()) {
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_OVERMAX, groupId);
sendResponse(ctx, DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
strBuff.append("Error msg, the ").append(HttpAttrConst.KEY_BODY)
.append(" length(").append(body.length())
.append(") is bigger than allowed length(")
.append(source.getMaxMsgLength()).append(")").toString(),
isCloseCon, callback);
return;
}
// get message count
int intMsgCnt = NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
String strMsgCount = String.valueOf(intMsgCnt);
// get audit version
long auditVersion = AuditUtils.getAuditVersion(reqAttrs);
// build message attributes
InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
strBuff.append("groupId=").append(groupId)
.append("&streamId=").append(streamId)
.append("&dt=").append(dataTime)
.append("&clientIp=").append(clientIp)
.append("&cnt=").append(strMsgCount)
.append("&rt=").append(msgRcvTime)
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
if (auditVersion != -1L) {
strBuff.append(AttributeConstants.SEPARATOR).append(AttributeConstants.AUDIT_VERSION)
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(auditVersion);
}
inLongMsg.addMsg(strBuff.toString(), body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
byte[] inlongMsgData = inLongMsg.buildArray();
long pkgTime = inLongMsg.getCreatetime();
inLongMsg.reset();
strBuff.delete(0, strBuff.length());
// build flume event
Map<String, String> eventHeaders = new HashMap<>();
eventHeaders.put(AttributeConstants.GROUP_ID, groupId);
eventHeaders.put(AttributeConstants.STREAM_ID, streamId);
eventHeaders.put(ConfigConstants.TOPIC_KEY, topicName);
eventHeaders.put(AttributeConstants.DATA_TIME, String.valueOf(dataTime));
eventHeaders.put(ConfigConstants.REMOTE_IP_KEY, clientIp);
eventHeaders.put(ConfigConstants.DATAPROXY_IP_KEY, source.getSrcHost());
eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
MessageWrapType.INLONG_MSG_V0.getStrId());
eventHeaders.put(EventConstants.HEADER_KEY_VERSION,
MessageWrapType.INLONG_MSG_V0.getStrId());
eventHeaders.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
eventHeaders.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(pkgTime));
eventHeaders.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditVersion));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
try {
source.getCachedChProcessor().processEvent(event);
source.fileMetricAddSuccStats(strBuff, groupId, streamId, topicName, clientIp,
"b2b", dataTime, pkgTime, intMsgCnt, 1, event.getBody().length);
source.addMetric(true, event.getBody().length, event);
sendSuccessResponse(ctx, isCloseCon, callback);
} catch (Throwable ex) {
source.fileMetricAddFailStats(strBuff, groupId, streamId, topicName, clientIp,
"b2b", dataTime, pkgTime, 1);
source.addMetric(false, event.getBody().length, event);
sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
strBuff.append("Put HTTP event to channel failure: ").append(ex.getMessage()).toString(), callback);
if (logCounter.shouldPrint()) {
logger.error("Error writing HTTP event to channel failure.", ex);
}
}
}