cpp/source/base/Protocol.cpp (116 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Protocol.h"
ROCKETMQ_NAMESPACE_BEGIN
const char* protocolVersion() {
static const char* protocol_version = "v2";
return protocol_version;
}
bool writable(rmq::Permission p) {
switch (p) {
case rmq::Permission::WRITE:
case rmq::Permission::READ_WRITE:
return true;
default: {
return false;
}
}
}
bool readable(rmq::Permission p) {
switch (p) {
case rmq::Permission::READ:
case rmq::Permission::READ_WRITE:
return true;
default:
return false;
}
}
bool operator<(const rmq::Resource& lhs, const rmq::Resource& rhs) {
return lhs.resource_namespace() < rhs.resource_namespace() || lhs.name() < rhs.name();
}
bool operator==(const rmq::Resource& lhs, const rmq::Resource& rhs) {
return lhs.resource_namespace() == rhs.resource_namespace() && lhs.name() == rhs.name();
}
bool operator<(const rmq::Broker& lhs, const rmq::Broker& rhs) {
return lhs.name() < rhs.name() || lhs.id() < rhs.id();
}
bool operator==(const rmq::Broker& lhs, const rmq::Broker& rhs) {
return lhs.name() == rhs.name() && lhs.id() == rhs.id();
}
bool operator<(const rmq::MessageQueue& lhs, const rmq::MessageQueue& rhs) {
return lhs.topic() < rhs.topic() || lhs.id() < rhs.id() || lhs.broker() < rhs.broker() ||
lhs.permission() < rhs.permission();
}
bool operator==(const rmq::MessageQueue& lhs, const rmq::MessageQueue& rhs) {
return lhs.topic() == rhs.topic() && lhs.id() == rhs.id() && lhs.broker() == rhs.broker() &&
lhs.permission() == rhs.permission();
}
std::string simpleNameOf(const rmq::MessageQueue& m) {
return fmt::format("{}@{}@{}@{}",
m.topic().resource_namespace(), m.topic().name(), m.id(), m.broker().name());
}
bool operator==(const std::vector<rmq::MessageQueue>& lhs, const std::vector<rmq::MessageQueue>& rhs) {
if (lhs.size() != rhs.size()) {
return false;
}
for (std::size_t i = 0; i < lhs.size(); i++) {
if (lhs[i] == rhs[i]) {
continue;
}
return false;
}
return true;
}
bool operator!=(const std::vector<rmq::MessageQueue>& lhs, const std::vector<rmq::MessageQueue>& rhs) {
return !(lhs == rhs);
}
std::string urlOf(const rmq::MessageQueue& message_queue) {
const auto& endpoints = message_queue.broker().endpoints();
const auto& addresses = endpoints.addresses();
switch (endpoints.scheme()) {
case rmq::AddressScheme::DOMAIN_NAME: {
assert(addresses.size() == 1);
auto first = addresses.begin();
return fmt::format("dns:{}:{}", first->host(), first->port());
}
case rmq::AddressScheme::IPv4: {
assert(!addresses.empty());
auto it = addresses.cbegin();
std::string result = fmt::format("ipv4:{}:{}", it->host(), it->port());
for (++it; it != addresses.cend(); ++it) {
result.append(fmt::format(",{}:{}", it->host(), it->port()));
}
return result;
}
case rmq::AddressScheme::IPv6: {
assert(!addresses.empty());
auto it = addresses.cbegin();
std::string result = fmt::format("ipv6:{}:{}", it->host(), it->port());
for (++it; it != addresses.cend(); ++it) {
result.append(fmt::format(",{}:{}", it->host(), it->port()));
}
return result;
}
default: {
break;
}
}
return {};
}
bool operator<(const rmq::Assignment& lhs, const rmq::Assignment& rhs) {
return lhs.message_queue() < rhs.message_queue();
}
bool operator==(const rmq::Assignment& lhs, const rmq::Assignment& rhs) {
return lhs.message_queue() == rhs.message_queue();
}
bool operator==(const std::vector<rmq::Assignment>& lhs, const std::vector<rmq::Assignment>& rhs) {
if (lhs.size() != rhs.size()) {
return false;
}
for (std::size_t i = 0; i < lhs.size(); i++) {
if (lhs[i] == rhs[i]) {
continue;
}
return false;
}
return true;
}
ROCKETMQ_NAMESPACE_END