in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java [594:707]
public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
ProcessResult result = new ProcessResult();
final long startTime = System.currentTimeMillis();
final StringBuilder strBuffer = new StringBuilder(512);
SendMessageResponseB2P.Builder builder = SendMessageResponseB2P.newBuilder();
builder.setSuccess(false);
if (!this.started.get()
|| ServiceStatusHolder.isWriteServiceStop()) {
builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
builder.setErrMsg("Write StoreService temporary unavailable!");
return builder.build();
}
if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
// get and check clientId field
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// get and check topicName and partitionId field
final int partitionId = request.getPartitionId();
if (!PBParameterUtils.getTopicNamePartIdInfo(true, request.getTopicName(),
partitionId, this.metadataManager, strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final TopicMetadata topicMetadata = (TopicMetadata) result.getRetData();
final String topicName = topicMetadata.getTopic();
String msgType = null;
int msgTypeCode = -1;
if (TStringUtils.isNotBlank(request.getMsgType())) {
msgType = request.getMsgType().trim();
msgTypeCode = msgType.hashCode();
}
final byte[] msgData = request.getData().toByteArray();
final int dataLength = msgData.length;
if (dataLength <= 0) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg("data length is zero!");
return builder.build();
}
if (dataLength > topicMetadata.getMaxMsgSize()) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg(strBuffer.append("data length over max length, allowed max length is ")
.append(topicMetadata.getMaxMsgSize())
.append(", data length is ").append(dataLength).toString());
return builder.build();
}
int checkSum = CheckSum.crc32(msgData);
if (request.getCheckSum() != -1 && checkSum != request.getCheckSum()) {
builder.setErrCode(TErrCodeConstants.FORBIDDEN);
builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
.append(request.getCheckSum()).append(" of ").append(topicName)
.append(" not equal to the data's checksum of ")
.append(checkSum).toString());
return builder.build();
}
if (!serverAuthHandler.validProduceAuthorizeInfo(
certifiedInfo.getUserName(), topicName, msgType, rmtAddress, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
try {
final MessageStore store =
this.storeManager.getOrCreateMessageStore(topicName, partitionId);
final AppendResult appendResult = new AppendResult();
if (store.appendMsg(appendResult, dataLength, checkSum, msgData,
msgTypeCode, request.getFlag(), partitionId, request.getSentAddr())) {
String baseKey = strBuffer.append(topicName)
.append("#").append(AddressUtils.intToIp(request.getSentAddr()))
.append("#").append(tubeConfig.getHostName())
.append("#").append(request.getPartitionId())
.append("#").append(request.getMsgTime()).toString();
putCounterGroup.add(baseKey, 1L, dataLength);
AuditUtils.addProduceRecord(topicName,
request.getMsgType(), request.getMsgTime(), 1, dataLength);
builder.setSuccess(true);
builder.setRequireAuth(certifiedInfo.isReAuth());
builder.setErrCode(TErrCodeConstants.SUCCESS);
// begin Deprecated, after 1.0, the ErrMsg set "Ok" or ""
builder.setErrMsg(String.valueOf(appendResult.getMsgId()));
// end Deprecated, after 1.0, the ErrMsg set "Ok" or ""
builder.setMessageId(appendResult.getMsgId());
builder.setAppendTime(appendResult.getAppendTime());
builder.setAppendOffset(appendResult.getAppendIndexOffset());
BrokerSrvStatsHolder.updSendMsgLatency(System.currentTimeMillis() - startTime);
return builder.build();
} else {
builder.setErrCode(TErrCodeConstants.SERVER_RECEIVE_OVERFLOW);
builder.setErrMsg(strBuffer.append("Put message failed from ")
.append(tubeConfig.getHostName())
.append(", server receive message overflow!").toString());
return builder.build();
}
} catch (final Throwable ex) {
logger.error("Put message failed ", ex);
strBuffer.delete(0, strBuffer.length());
builder.setSuccess(false);
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
builder.setErrMsg(strBuffer.append("Put message failed from ")
.append(tubeConfig.getHostName()).append(" ")
.append((ex.getMessage() != null ? ex.getMessage() : " ")).toString());
return builder.build();
}
}