in auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java [167:338]
public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, RemotingCommand command) {
List<DefaultAuthorizationContext> result = new ArrayList<>();
try {
HashMap<String, String> fields = command.getExtFields();
if (MapUtils.isEmpty(fields)) {
return result;
}
Subject subject = null;
if (fields.containsKey(SessionCredentials.ACCESS_KEY)) {
subject = User.of(fields.get(SessionCredentials.ACCESS_KEY));
}
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(context.channel());
String sourceIp = StringUtils.substringBeforeLast(remoteAddr, CommonConstants.COLON);
Resource topic;
Resource group;
switch (command.getCode()) {
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
if (NamespaceUtil.isRetryTopic(fields.get(TOPIC))) {
group = Resource.ofGroup(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, group, Arrays.asList(Action.SUB, Action.GET), sourceIp));
} else {
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Arrays.asList(Action.PUB, Action.SUB, Action.GET), sourceIp));
}
break;
case RequestCode.SEND_MESSAGE:
if (NamespaceUtil.isRetryTopic(fields.get(TOPIC))) {
if (StringUtils.isNotBlank(fields.get(GROUP))) {
group = Resource.ofGroup(fields.get(GROUP));
} else {
group = Resource.ofGroup(fields.get(TOPIC));
}
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
} else {
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp));
}
break;
case RequestCode.SEND_MESSAGE_V2:
case RequestCode.SEND_BATCH_MESSAGE:
if (NamespaceUtil.isRetryTopic(fields.get(B))) {
if (StringUtils.isNotBlank(fields.get(A))) {
group = Resource.ofGroup(fields.get(A));
} else {
group = Resource.ofGroup(fields.get(B));
}
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
} else {
topic = Resource.ofTopic(fields.get(B));
result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp));
}
break;
case RequestCode.RECALL_MESSAGE:
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp));
break;
case RequestCode.END_TRANSACTION:
if (StringUtils.isNotBlank(fields.get(TOPIC))) {
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Action.PUB, sourceIp));
}
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
group = Resource.ofGroup(fields.get(GROUP));
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
break;
case RequestCode.PULL_MESSAGE:
if (!NamespaceUtil.isRetryTopic(fields.get(TOPIC))) {
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Action.SUB, sourceIp));
}
group = Resource.ofGroup(fields.get(CONSUMER_GROUP));
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
break;
case RequestCode.QUERY_MESSAGE:
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Arrays.asList(Action.SUB, Action.GET), sourceIp));
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(command.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
group = Resource.ofGroup(data.getGroupName());
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
if (NamespaceUtil.isRetryTopic(subscriptionData.getTopic())) {
continue;
}
topic = Resource.ofTopic(subscriptionData.getTopic());
result.add(DefaultAuthorizationContext.of(subject, topic, Action.SUB, sourceIp));
}
}
break;
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
command.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
if (StringUtils.isNotBlank(unregisterClientRequestHeader.getConsumerGroup())) {
group = Resource.ofGroup(unregisterClientRequestHeader.getConsumerGroup());
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
}
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
command.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
group = Resource.ofGroup(getConsumerListByGroupRequestHeader.getConsumerGroup());
result.add(DefaultAuthorizationContext.of(subject, group, Arrays.asList(Action.SUB, Action.GET), sourceIp));
break;
case RequestCode.QUERY_CONSUMER_OFFSET:
final QueryConsumerOffsetRequestHeader queryConsumerOffsetRequestHeader =
command.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
if (!NamespaceUtil.isRetryTopic(queryConsumerOffsetRequestHeader.getTopic())) {
topic = Resource.ofTopic(queryConsumerOffsetRequestHeader.getTopic());
result.add(DefaultAuthorizationContext.of(subject, topic, Arrays.asList(Action.SUB, Action.GET), sourceIp));
}
group = Resource.ofGroup(queryConsumerOffsetRequestHeader.getConsumerGroup());
result.add(DefaultAuthorizationContext.of(subject, group, Arrays.asList(Action.SUB, Action.GET), sourceIp));
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
command.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
if (!NamespaceUtil.isRetryTopic(updateConsumerOffsetRequestHeader.getTopic())) {
topic = Resource.ofTopic(updateConsumerOffsetRequestHeader.getTopic());
result.add(DefaultAuthorizationContext.of(subject, topic, Arrays.asList(Action.SUB, Action.UPDATE), sourceIp));
}
group = Resource.ofGroup(updateConsumerOffsetRequestHeader.getConsumerGroup());
result.add(DefaultAuthorizationContext.of(subject, group, Arrays.asList(Action.SUB, Action.UPDATE), sourceIp));
break;
case RequestCode.LOCK_BATCH_MQ:
LockBatchRequestBody lockBatchRequestBody = LockBatchRequestBody.decode(command.getBody(), LockBatchRequestBody.class);
group = Resource.ofGroup(lockBatchRequestBody.getConsumerGroup());
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
if (CollectionUtils.isNotEmpty(lockBatchRequestBody.getMqSet())) {
for (MessageQueue messageQueue : lockBatchRequestBody.getMqSet()) {
if (NamespaceUtil.isRetryTopic(messageQueue.getTopic())) {
continue;
}
topic = Resource.ofTopic(messageQueue.getTopic());
result.add(DefaultAuthorizationContext.of(subject, topic, Action.SUB, sourceIp));
}
}
break;
case RequestCode.UNLOCK_BATCH_MQ:
UnlockBatchRequestBody unlockBatchRequestBody = LockBatchRequestBody.decode(command.getBody(), UnlockBatchRequestBody.class);
group = Resource.ofGroup(unlockBatchRequestBody.getConsumerGroup());
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
if (CollectionUtils.isNotEmpty(unlockBatchRequestBody.getMqSet())) {
for (MessageQueue messageQueue : unlockBatchRequestBody.getMqSet()) {
if (NamespaceUtil.isRetryTopic(messageQueue.getTopic())) {
continue;
}
topic = Resource.ofTopic(messageQueue.getTopic());
result.add(DefaultAuthorizationContext.of(subject, topic, Action.SUB, sourceIp));
}
}
break;
default:
result = buildContextByAnnotation(subject, command, sourceIp);
break;
}
if (CollectionUtils.isNotEmpty(result)) {
result.forEach(r -> {
r.setChannelId(context.channel().id().asLongText());
r.setRpcCode(String.valueOf(command.getCode()));
});
}
} catch (AuthorizationException ex) {
throw ex;
} catch (Throwable t) {
throw new AuthorizationException("parse authorization context error.", t);
}
return result;
}