aios/apps/facility/swift/monitor/BrokerMetricsReporter.cpp (766 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "swift/monitor/BrokerMetricsReporter.h" #include <iosfwd> #include <utility> #include "autil/CommonMacros.h" #include "autil/HashAlgorithm.h" #include "autil/LockFreeThreadPool.h" #include "autil/StringUtil.h" #include "autil/ThreadPool.h" #include "autil/TimeUtility.h" #include "autil/WorkItemQueue.h" #include "kmonitor/client/MetricLevel.h" #include "kmonitor/client/MetricMacro.h" #include "kmonitor/client/core/MetricsTags.h" #include "kmonitor/client/core/MutableMetric.h" #include "swift/monitor/MetricsCommon.h" #include "swift/protocol/Common.pb.h" using namespace kmonitor; using namespace std; using namespace autil; namespace swift { namespace monitor { AUTIL_LOG_SETUP(swift, BrokerMetricsReporter); static size_t CLEAR_TAGS_MAP_THRESHOLD = 100000; BrokerMetricsReporter::BrokerMetricsReporter(int reportThreadNum) { _metricsReporter = new MetricsReporter("swift", "", {}); if (reportThreadNum > 0) { _threadPool = new LockFreeThreadPool(reportThreadNum, 10000, nullptr, "metrics"); if (!_threadPool->start()) { AUTIL_LOG(ERROR, "start backup report metric thread failed."); DELETE_AND_SET_NULL(_threadPool); } } } BrokerMetricsReporter::~BrokerMetricsReporter() { if (_threadPool) { _threadPool->stop(); DELETE_AND_SET_NULL(_threadPool); } DELETE_AND_SET_NULL(_metricsReporter); } LongPollingMetrics::LongPollingMetrics() {} bool LongPollingMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(longPollingHoldCount, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(longPollingTimeoutCount, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(longPollingEnqueueTimes, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(longPollingLatencyOnce, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(longPollingLatencyAcc, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(longPollingLatencyActive, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readRequestToCurTimeInterval, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readMsgToCurTimeInterval, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(lastAcceptedMsgToCurTimeInterval, BROKER_WORKER_GROUP_STATUS_METRIC); return true; } void LongPollingMetrics::report(const MetricsTags *tags, LongPollingMetricsCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(longPollingHoldCount, collector->longPollingHoldCount); SWIFT_REPORT_MUTABLE_METRIC(longPollingTimeoutCount, collector->longPollingTimeoutCount); SWIFT_REPORT_MUTABLE_METRIC(longPollingEnqueueTimes, collector->longPollingEnqueueTimes); SWIFT_REPORT_MUTABLE_METRIC(longPollingLatencyOnce, collector->longPollingLatencyOnce); SWIFT_REPORT_MUTABLE_METRIC(longPollingLatencyAcc, collector->longPollingLatencyAcc); SWIFT_REPORT_MUTABLE_METRIC(longPollingLatencyActive, collector->longPollingLatencyActive); SWIFT_REPORT_MUTABLE_METRIC(readRequestToCurTimeInterval, collector->readRequestToCurTimeInterval); SWIFT_REPORT_MUTABLE_METRIC(readMsgToCurTimeInterval, collector->readMsgToCurTimeInterval); SWIFT_REPORT_MUTABLE_METRIC(lastAcceptedMsgToCurTimeInterval, collector->lastAcceptedMsgToCurTimeInterval); } PartitionMetrics::PartitionMetrics() {} bool PartitionMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(lastReadMsgTimeInterval, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(numOfWriteRequestToProcess, WRITE_REQUEST_SECURITY_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(dataSizeOfWriteRequestToProcess, WRITE_REQUEST_SECURITY_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(partitionDataSizeInBuffer, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(partitionMetaSizeInBuffer, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(leftToBeCommittedDataSize, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(leftToBeCommittedDataUseBufferRatio, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(lastAcceptedMsgTimeInterval, PARTITION_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(partitionOldestMsgTimeStampInBuffer, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(partitionCommitDelay, PARTITION_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(partitionUsedBufferSize, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(partitionUnusedMaxBufferSize, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(partitionReserveFreeBufferSize, PARTITION_MEMORY_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(maxMsgId, PARTITION_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(msgCountInBuffer, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(partitionStatus, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeDataFileCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeMetaFileCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeFailMetaFileCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeFailDataFileCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeBufferSize, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(cacheFileCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(cacheMetaBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(cacheDataBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(partitionUsedDfsSize, PARTITION_DFS_GROUP_METRIC); return true; } void PartitionMetrics::report(const MetricsTags *tags, PartitionMetricsCollector *collector) { int64_t currentTime = autil::TimeUtility::currentTime(); SWIFT_REPORT_MUTABLE_METRIC(lastAcceptedMsgTimeInterval, autil::TimeUtility::us2ms(currentTime - collector->lastAcceptedMsgTimeStamp)); int64_t lastReadMsgTimeInter = autil::TimeUtility::us2ms(collector->lastAcceptedMsgTimeStamp - collector->lastReadMsgTimeStamp); SWIFT_REPORT_MUTABLE_METRIC(lastReadMsgTimeInterval, lastReadMsgTimeInter); if (collector->oldestMsgTimeStampInBuffer >= 0) { REPORT_MUTABLE_METRIC(partitionOldestMsgTimeStampInBuffer, collector->oldestMsgTimeStampInBuffer / 1000000.0); // second } SWIFT_REPORT_MUTABLE_METRIC(partitionCommitDelay, collector->commitDelay / 1000.0); // ms SWIFT_REPORT_MUTABLE_METRIC(leftToBeCommittedDataSize, collector->leftToBeCommittedDataSize); SWIFT_REPORT_MUTABLE_METRIC(leftToBeCommittedDataUseBufferRatio, 100.0 * collector->leftToBeCommittedDataSize / (double)(collector->usedBufferSize + collector->unusedMaxBufferSize)); if (protocol::TOPIC_MODE_SECURITY == collector->topicMode) { SWIFT_REPORT_MUTABLE_METRIC(numOfWriteRequestToProcess, collector->writeRequestQueueSize); SWIFT_REPORT_MUTABLE_METRIC(dataSizeOfWriteRequestToProcess, collector->writeRequestQueueDataSize); } SWIFT_REPORT_MUTABLE_METRIC(partitionUsedBufferSize, collector->usedBufferSize); SWIFT_REPORT_MUTABLE_METRIC(partitionDataSizeInBuffer, collector->actualDataSize); SWIFT_REPORT_MUTABLE_METRIC(partitionMetaSizeInBuffer, collector->actualMetaSize); SWIFT_REPORT_MUTABLE_METRIC(partitionUnusedMaxBufferSize, collector->unusedMaxBufferSize); SWIFT_REPORT_MUTABLE_METRIC(partitionReserveFreeBufferSize, collector->reserveFreeBufferSize); SWIFT_REPORT_MUTABLE_METRIC(maxMsgId, collector->maxMsgId); SWIFT_REPORT_MUTABLE_METRIC(msgCountInBuffer, collector->msgCountInBuffer); SWIFT_REPORT_MUTABLE_METRIC(partitionStatus, collector->partitionStatus); SWIFT_REPORT_MUTABLE_METRIC(writeMetaFileCount, collector->writeMetaFileCount); SWIFT_REPORT_MUTABLE_METRIC(writeFailMetaFileCount, collector->writeFailMetaFileCount); SWIFT_REPORT_MUTABLE_METRIC(writeDataFileCount, collector->writeDataFileCount); SWIFT_REPORT_MUTABLE_METRIC(writeFailDataFileCount, collector->writeFailDataFileCount); SWIFT_REPORT_MUTABLE_METRIC(writeBufferSize, collector->writeBufferSize); SWIFT_REPORT_MUTABLE_METRIC(cacheFileCount, collector->cacheFileCount); SWIFT_REPORT_MUTABLE_METRIC(cacheMetaBlockCount, collector->cacheMetaBlockCount); SWIFT_REPORT_MUTABLE_METRIC(cacheDataBlockCount, collector->cacheDataBlockCount); SWIFT_REPORT_MUTABLE_METRIC(partitionUsedDfsSize, collector->usedDfsSize); } bool WorkerMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(fileCacheUsedSize, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(fileCacheUsedRatio, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(loadedPartitionCount, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_STATUS_METRIC(messageTotalUsedBufferSize, BROKER_WORKER_GROUP_METRIC); // byte SWIFT_REGISTER_STATUS_METRIC(messageTotalUnusedBufferSize, BROKER_WORKER_GROUP_METRIC); // byte SWIFT_REGISTER_GAUGE_METRIC(serviceStatus, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(rejectTime, BROKER_WORKER_GROUP_METRIC); // ms SWIFT_REGISTER_GAUGE_METRIC(commitMessageQueueSize, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(commitMessageQueueActiveThreadNum, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readFileUsedThreadCount, BROKER_WORKER_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(actualReadingFileThreadCount, BROKER_WORKER_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(actualReadingFileCount, BROKER_WORKER_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeUsedThreadCount, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(maxMsgIdUsedThreadCount, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(minMsgIdUsedThreadCount, BROKER_WORKER_GROUP_METRIC); return true; } void WorkerMetrics::report(const MetricsTags *tags, WorkerMetricsCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(fileCacheUsedSize, collector->fileCacheUsedSize); SWIFT_REPORT_MUTABLE_METRIC(fileCacheUsedRatio, collector->fileCacheUsedRatio); SWIFT_REPORT_MUTABLE_METRIC(loadedPartitionCount, collector->loadedPartitionCount); SWIFT_REPORT_MUTABLE_METRIC(messageTotalUsedBufferSize, collector->messageTotalUsedBufferSize); SWIFT_REPORT_MUTABLE_METRIC(messageTotalUnusedBufferSize, collector->messageTotalUnusedBufferSize); REPORT_MUTABLE_METRIC(serviceStatus, collector->serviceStatus); SWIFT_REPORT_MUTABLE_METRIC(rejectTime, collector->rejectTime); SWIFT_REPORT_MUTABLE_METRIC(commitMessageQueueSize, collector->commitMessageQueueSize); SWIFT_REPORT_MUTABLE_METRIC(commitMessageQueueActiveThreadNum, collector->commitMessageActiveThreadNum); SWIFT_REPORT_MUTABLE_METRIC(readFileUsedThreadCount, collector->readFileUsedThreadCount); SWIFT_REPORT_MUTABLE_METRIC(actualReadingFileThreadCount, collector->actualReadingFileThreadCount); SWIFT_REPORT_MUTABLE_METRIC(actualReadingFileCount, collector->actualReadingFileCount); SWIFT_REPORT_MUTABLE_METRIC(writeUsedThreadCount, collector->writeUsedThreadCount); SWIFT_REPORT_MUTABLE_METRIC(maxMsgIdUsedThreadCount, collector->maxMsgIdUsedThreadCount); SWIFT_REPORT_MUTABLE_METRIC(minMsgIdUsedThreadCount, collector->minMsgIdUsedThreadCount); } bool BrokerInStatusMetrics::init(kmonitor::MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(cpu, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(mem, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeRate1min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeRate5min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeRequest1min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeRequest5min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readRate1min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readRate5min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readRequest1min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readRequest5min, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(commitDelay, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(zfsTimeout, BROKER_WORKER_GROUP_STATUS_METRIC); return true; } void BrokerInStatusMetrics::report(const MetricsTags *tags, BrokerInStatusCollector *collector) { REPORT_MUTABLE_METRIC(cpu, collector->cpu); REPORT_MUTABLE_METRIC(mem, collector->mem); REPORT_MUTABLE_METRIC(writeRate1min, collector->writeRate1min); REPORT_MUTABLE_METRIC(writeRate5min, collector->writeRate5min); REPORT_MUTABLE_METRIC(writeRequest1min, collector->writeRequest1min); REPORT_MUTABLE_METRIC(writeRequest5min, collector->writeRequest5min); REPORT_MUTABLE_METRIC(readRate1min, collector->readRate1min); REPORT_MUTABLE_METRIC(readRate5min, collector->readRate5min); REPORT_MUTABLE_METRIC(readRequest1min, collector->readRequest1min); REPORT_MUTABLE_METRIC(readRequest5min, collector->readRequest5min); SWIFT_REPORT_MUTABLE_METRIC(commitDelay, collector->commitDelay); SWIFT_REPORT_MUTABLE_METRIC(zfsTimeout, collector->zfsTimeout); } bool BrokerSysMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(getMsgQueueSize, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readUsedThreadCount, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(brokerQueueSize, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(holdQueueSize, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(holdTimeoutCount, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(clearPollingQueueSize, BROKER_WORKER_GROUP_METRIC); return true; } void BrokerSysMetrics::report(const MetricsTags *tags, BrokerSysMetricsCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(getMsgQueueSize, collector->getMsgQueueSize); SWIFT_REPORT_MUTABLE_METRIC(readUsedThreadCount, collector->readThreadNum); SWIFT_REPORT_MUTABLE_METRIC(brokerQueueSize, collector->brokerQueueSize); SWIFT_REPORT_MUTABLE_METRIC(holdQueueSize, collector->holdQueueSize); SWIFT_REPORT_MUTABLE_METRIC(holdTimeoutCount, collector->holdTimeoutCount); SWIFT_REPORT_MUTABLE_METRIC(clearPollingQueueSize, collector->clearPollingQueueSize); } bool WriteMetrics::init(MetricsGroupManager *manager) { // broker partition SWIFT_REGISTER_QPS_METRIC(writeRequestQps, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(requestDecompressedRatio, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(requestDecompressedLatency, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(writeBusyErrorQps, PARTITION_ERROR_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(otherWriteErrorQps, PARTITION_ERROR_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(writeVersionErrorQps, PARTITION_ERROR_GROUP_METRIC); // brain SWIFT_REGISTER_GAUGE_METRIC(msgsSizePerWriteRequest, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(msgsCountPerWriteRequest, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(msgsCountWithMergedPerWriteRequest, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(deniedNumOfWriteRequestWithError, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(msgCompressedRatio, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(msgCompressedLatency, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeRequestLatency, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeMsgDecompressedRatio, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(writeMsgDecompressedLatency, WRITE_MSG_GROUP_METRIC); // security SWIFT_REGISTER_QPS_METRIC(timeoutNumOfWriteRequest, WRITE_REQUEST_SECURITY_GROUP_METRIC); // group SWIFT_REGISTER_QPS_METRIC(acceptedMsgQps, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(mergedAcceptedMsgQps, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(totalAcceptedMsgQps, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(acceptedMsgSize, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(acceptedMsgsCountPerWriteRequest, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(acceptedMsgsSizePerWriteRequest, WRITE_REQUEST_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(totalAcceptedMsgsSize, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(acceptedMsgQps_FB, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(acceptedMsgQps_PB, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(acceptedCompressMsgQps, WRITE_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(acceptedUncompressMsgQps, WRITE_MSG_GROUP_METRIC); return true; } void WriteMetrics::report(const MetricsTags *tags, WriteMetricsCollector *collector) { // broker partition SWIFT_REPORT_QPS_METRIC(writeRequestQps); SWIFT_REPORT_MUTABLE_METRIC(requestDecompressedRatio, collector->requestDecompressedRatio); SWIFT_REPORT_MUTABLE_METRIC(requestDecompressedLatency, collector->requestDecompressedLatency); if (collector->writeBusyError) { SWIFT_REPORT_QPS_METRIC(writeBusyErrorQps); } if (collector->otherWriteError) { SWIFT_REPORT_QPS_METRIC(otherWriteErrorQps); } if (collector->writeVersionError) { SWIFT_REPORT_QPS_METRIC(writeVersionErrorQps); } // brain SWIFT_REPORT_MUTABLE_METRIC(msgsSizePerWriteRequest, collector->msgsSizePerWriteRequest); SWIFT_REPORT_MUTABLE_METRIC(msgsCountPerWriteRequest, collector->msgsCountPerWriteRequest); SWIFT_REPORT_MUTABLE_METRIC(msgsCountWithMergedPerWriteRequest, collector->msgsCountWithMergedPerWriteRequest); SWIFT_REPORT_MUTABLE_METRIC(deniedNumOfWriteRequestWithError, collector->deniedNumOfWriteRequestWithError); SWIFT_REPORT_MUTABLE_METRIC(msgCompressedRatio, collector->msgCompressedRatio); SWIFT_REPORT_MUTABLE_METRIC(msgCompressedLatency, collector->msgCompressedLatency); SWIFT_REPORT_MUTABLE_METRIC(writeRequestLatency, collector->writeRequestLatency); SWIFT_REPORT_MUTABLE_METRIC(writeMsgDecompressedRatio, collector->writeMsgDecompressedRatio); SWIFT_REPORT_MUTABLE_METRIC(writeMsgDecompressedLatency, collector->writeMsgDecompressedLatency); // security SWIFT_REPORT_MUTABLE_METRIC(timeoutNumOfWriteRequest, collector->timeoutNumOfWriteRequest); // group SWIFT_REPORT_MUTABLE_METRIC(acceptedMsgQps, collector->acceptedMsgCount); if (collector->mergedAcceptedMsgCount > 0) { REPORT_MUTABLE_METRIC(mergedAcceptedMsgQps, collector->mergedAcceptedMsgCount); } SWIFT_REPORT_MUTABLE_METRIC(totalAcceptedMsgQps, collector->totalAcceptedMsgCount); SWIFT_REPORT_MUTABLE_METRIC(acceptedMsgSize, collector->acceptedMsgSize); SWIFT_REPORT_MUTABLE_METRIC(acceptedMsgsCountPerWriteRequest, collector->acceptedMsgsCountPerWriteRequest); SWIFT_REPORT_MUTABLE_METRIC(acceptedMsgsSizePerWriteRequest, collector->acceptedMsgsSizePerWriteRequest); SWIFT_REPORT_MUTABLE_METRIC(totalAcceptedMsgsSize, collector->totalAcceptedMsgsSize); SWIFT_REPORT_MUTABLE_METRIC(acceptedMsgQps_FB, collector->acceptedMsgCount_FB); SWIFT_REPORT_MUTABLE_METRIC(acceptedMsgQps_PB, collector->acceptedMsgCount_PB); SWIFT_REPORT_MUTABLE_METRIC(acceptedCompressMsgQps, collector->acceptedCompressMsgCount); SWIFT_REPORT_MUTABLE_METRIC(acceptedUncompressMsgQps, collector->acceptedUncompressMsgCount); } bool ReadMetrics::init(MetricsGroupManager *manager) { // broker partition SWIFT_REGISTER_QPS_METRIC(readRequestQps, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(requiredMsgsCountPerReadRequest, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(readBusyErrorQps, PARTITION_ERROR_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(messageLostErrorQps, PARTITION_ERROR_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(brokerNodataErrorQps, PARTITION_ERROR_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(otherReadErrorQps, PARTITION_ERROR_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readRequestLatency, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readMsgDecompressedRatio, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(readMsgDecompressedLatency, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(requestCompressedRatio, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(msgCompressedLatency, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(unpackMsgQPS, READ_MSG_GROUP_METRIC); // brain SWIFT_REGISTER_QPS_METRIC(readMsgQPS, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(readMsgQPS_PB, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(readMsgQPS_FB, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(returnedMsgSize, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(returnedMsgsSizePerReadRequest, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(returnedMsgsCountPerReadRequest, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(returnedMsgsCountWithMergePerReadRequest, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(mergedReadMsgQPS, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(totalReadMsgQPS, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(readLimitQps, READ_REQUEST_GROUP_METRIC); // field filter SWIFT_REGISTER_GAUGE_METRIC(fieldFilterReadMsgCountPerReadRequest, READ_REQUEST_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(fieldFilterUpdatedMsgRatio, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(fieldFilteredMsgSize, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(rawFieldFilteredMsgSize, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(fieldFilteredMsgRatio, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(fieldFilterMsgsSizePerReadRequest, READ_REQUEST_GROUP_METRIC); // group SWIFT_REGISTER_QPS_METRIC(msgReadQpsFromMemory, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(msgReadQpsWithMergedFromMemory, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(msgReadRateFromMemory, READ_MSG_GROUP_METRIC); // fs message reader SWIFT_REGISTER_QPS_METRIC(partitionReadFileMetaFailedQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadFileDataFailedQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadMetaRateFromDFS, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadDataRateFromDFS, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionActualReadMetaRateFromDFS, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionActualReadDataRateFromDFS, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(msgReadRateFromDFS, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(msgReadQpsFromDFS, READ_MSG_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(msgReadQpsWithMergedFromDFS, READ_MSG_GROUP_METRIC); // file cache read block SWIFT_REGISTER_QPS_METRIC(partitionReadBlockQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadBlockErrorQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(partitionBlockCacheHitRatio, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionDfsActualReadRate, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadDataBlockQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadDataBlockErrorQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(partitionDataBlockCacheHitRatio, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionDfsActualReadDataRate, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadMetaBlockQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionReadMetaBlockErrorQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(partitionMetaBlockCacheHitRatio, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(partitionDfsActualReadMetaRate, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(actualReadBlockCount, PARTITION_DFS_GROUP_METRIC); return true; } void ReadMetrics::report(const MetricsTags *tags, ReadMetricsCollector *collector) { // broker partition SWIFT_REPORT_QPS_METRIC(readRequestQps); SWIFT_REPORT_MUTABLE_METRIC(requiredMsgsCountPerReadRequest, collector->requiredMsgsCountPerReadRequest); if (collector->readBusyError) { SWIFT_REPORT_QPS_METRIC(readBusyErrorQps); } if (collector->messageLostError) { SWIFT_REPORT_QPS_METRIC(messageLostErrorQps); } if (collector->brokerNodataError) { SWIFT_REPORT_QPS_METRIC(brokerNodataErrorQps); } if (collector->otherReadError) { SWIFT_REPORT_QPS_METRIC(otherReadErrorQps); } SWIFT_REPORT_MUTABLE_METRIC(readRequestLatency, collector->readRequestLatency); SWIFT_REPORT_MUTABLE_METRIC(readMsgDecompressedRatio, collector->readMsgDecompressedRatio); SWIFT_REPORT_MUTABLE_METRIC(readMsgDecompressedLatency, collector->readMsgDecompressedLatency); SWIFT_REPORT_MUTABLE_METRIC(requestCompressedRatio, collector->requestCompressedRatio); SWIFT_REPORT_MUTABLE_METRIC(msgCompressedLatency, collector->msgCompressedLatency); SWIFT_REPORT_MUTABLE_METRIC(unpackMsgQPS, collector->unpackMsgQPS); // brain SWIFT_REPORT_MUTABLE_METRIC(readMsgQPS, collector->readMsgCount); SWIFT_REPORT_MUTABLE_METRIC(readMsgQPS_PB, collector->readMsgCount_PB); SWIFT_REPORT_MUTABLE_METRIC(readMsgQPS_FB, collector->readMsgCount_FB); SWIFT_REPORT_MUTABLE_METRIC(returnedMsgSize, collector->returnedMsgSize); SWIFT_REPORT_MUTABLE_METRIC(returnedMsgsSizePerReadRequest, collector->returnedMsgsSizePerReadRequest); SWIFT_REPORT_MUTABLE_METRIC(returnedMsgsCountPerReadRequest, collector->returnedMsgsCountPerReadRequest); SWIFT_REPORT_MUTABLE_METRIC(returnedMsgsCountWithMergePerReadRequest, collector->returnedMsgsCountWithMergePerReadRequest); SWIFT_REPORT_MUTABLE_METRIC(mergedReadMsgQPS, collector->mergedReadMsgCount); SWIFT_REPORT_MUTABLE_METRIC(totalReadMsgQPS, collector->totalReadMsgCount); SWIFT_REPORT_MUTABLE_METRIC(readLimitQps, collector->readLimitQps); // field filter SWIFT_REPORT_MUTABLE_METRIC(fieldFilterReadMsgCountPerReadRequest, collector->fieldFilterReadMsgCountPerReadRequest); SWIFT_REPORT_MUTABLE_METRIC(fieldFilterUpdatedMsgRatio, collector->fieldFilterUpdatedMsgRatio); SWIFT_REPORT_MUTABLE_METRIC(fieldFilteredMsgSize, collector->fieldFilteredMsgSize); SWIFT_REPORT_MUTABLE_METRIC(rawFieldFilteredMsgSize, collector->rawFieldFilteredMsgSize); SWIFT_REPORT_MUTABLE_METRIC(fieldFilteredMsgRatio, collector->fieldFilteredMsgRatio); SWIFT_REPORT_MUTABLE_METRIC(fieldFilterMsgsSizePerReadRequest, collector->fieldFilterMsgsSizePerReadRequest); // group SWIFT_REPORT_MUTABLE_METRIC(msgReadQpsFromMemory, collector->msgReadQpsFromMemory); SWIFT_REPORT_MUTABLE_METRIC(msgReadQpsWithMergedFromMemory, collector->msgReadQpsWithMergedFromMemory); SWIFT_REPORT_MUTABLE_METRIC(msgReadRateFromMemory, collector->msgReadRateFromMemory); // fs message reader if (collector->partitionReadFileMetaFailed) { SWIFT_REPORT_QPS_METRIC(partitionReadFileMetaFailedQps); } if (collector->partitionReadFileDataFailed) { SWIFT_REPORT_QPS_METRIC(partitionReadFileDataFailedQps); } SWIFT_REPORT_MUTABLE_METRIC(partitionReadMetaRateFromDFS, collector->partitionReadMetaRateFromDFS); SWIFT_REPORT_MUTABLE_METRIC(partitionReadDataRateFromDFS, collector->partitionReadDataRateFromDFS); SWIFT_REPORT_MUTABLE_METRIC(partitionActualReadMetaRateFromDFS, collector->partitionActualReadMetaRateFromDFS); SWIFT_REPORT_MUTABLE_METRIC(partitionActualReadDataRateFromDFS, collector->partitionActualReadDataRateFromDFS); SWIFT_REPORT_MUTABLE_METRIC(msgReadRateFromDFS, collector->msgReadRateFromDFS); SWIFT_REPORT_MUTABLE_METRIC(msgReadQpsFromDFS, collector->msgReadQpsFromDFS); SWIFT_REPORT_MUTABLE_METRIC(msgReadQpsWithMergedFromDFS, collector->msgReadQpsWithMergedFromDFS); // file cache read block if (collector->partitionReadBlockQps > 0) { REPORT_MUTABLE_METRIC(partitionReadBlockQps, collector->partitionReadBlockQps); } if (collector->partitionReadBlockErrorQps > 0) { REPORT_MUTABLE_METRIC(partitionReadBlockErrorQps, collector->partitionReadBlockErrorQps); } if (collector->partitionReadDataBlockQps > 0) { REPORT_MUTABLE_METRIC(partitionReadDataBlockQps, collector->partitionReadDataBlockQps); } if (collector->partitionReadDataBlockErrorQps > 0) { REPORT_MUTABLE_METRIC(partitionReadDataBlockErrorQps, collector->partitionReadDataBlockErrorQps); } if (collector->partitionReadMetaBlockQps > 0) { REPORT_MUTABLE_METRIC(partitionReadMetaBlockQps, collector->partitionReadMetaBlockQps); } if (collector->partitionReadMetaBlockErrorQps > 0) { REPORT_MUTABLE_METRIC(partitionReadMetaBlockErrorQps, collector->partitionReadMetaBlockErrorQps); } if (collector->partitionDfsActualReadRate > 0) { REPORT_MUTABLE_METRIC(partitionDfsActualReadRate, collector->partitionDfsActualReadRate); } if (collector->partitionDfsActualReadDataRate > 0) { REPORT_MUTABLE_METRIC(partitionDfsActualReadDataRate, collector->partitionDfsActualReadDataRate); } if (collector->partitionDfsActualReadMetaRate > 0) { REPORT_MUTABLE_METRIC(partitionDfsActualReadMetaRate, collector->partitionDfsActualReadMetaRate); } if (collector->actualReadBlockCount > 0) { REPORT_MUTABLE_METRIC(actualReadBlockCount, collector->actualReadBlockCount); } for (uint32_t i = 0; i < collector->partitionBlockCacheHitTimes; ++i) { REPORT_MUTABLE_METRIC(partitionBlockCacheHitRatio, 100.0); } for (uint32_t i = 0; i < collector->partitionBlockCacheNotHitTimes; ++i) { REPORT_MUTABLE_METRIC(partitionBlockCacheHitRatio, 0); } for (uint32_t i = 0; i < collector->partitionDataBlockCacheHitTimes; ++i) { REPORT_MUTABLE_METRIC(partitionDataBlockCacheHitRatio, 100.0); } for (uint32_t i = 0; i < collector->partitionDataBlockCacheNotHitTimes; ++i) { REPORT_MUTABLE_METRIC(partitionDataBlockCacheHitRatio, 0); } for (uint32_t i = 0; i < collector->partitionMetaBlockCacheHitTimes; ++i) { REPORT_MUTABLE_METRIC(partitionMetaBlockCacheHitRatio, 100.0); } for (uint32_t i = 0; i < collector->partitionMetaBlockCacheNotHitTimes; ++i) { REPORT_MUTABLE_METRIC(partitionMetaBlockCacheHitRatio, 0); } } bool CommitFileMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(dfsFlushDataLatency, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(dfsFlushMetaLatency, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(dfsCommitLatency, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(dfsCommitSize, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(dfsCommitQps, PARTITION_DFS_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(dfsCommitInterval, PARTITION_DFS_GROUP_METRIC); return true; } void CommitFileMetrics::report(const MetricsTags *tags, CommitFileMetricsCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(dfsFlushDataLatency, collector->dfsFlushDataLatency); SWIFT_REPORT_MUTABLE_METRIC(dfsFlushMetaLatency, collector->dfsFlushMetaLatency); SWIFT_REPORT_MUTABLE_METRIC(dfsCommitLatency, collector->dfsCommitLatency); SWIFT_REPORT_MUTABLE_METRIC(dfsCommitSize, collector->dfsCommitSize); SWIFT_REPORT_MUTABLE_METRIC(dfsCommitQps, collector->dfsCommitQps); SWIFT_REPORT_MUTABLE_METRIC(dfsCommitInterval, collector->dfsCommitInterval); } bool RecycleBlockMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_QPS_METRIC(recycleObsoleteMetaBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleObsoleteDataBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(recycleObsoleteBlockLatency, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleObsoleteBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleByDisMetaBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleByDisDataBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(recycleByDisLatency, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleByDisBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(recycleByDisMetaThreshold, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(recycleByDisDataThreshold, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleBlockCount, PARTITION_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(recycleBlockCacheQps, PARTITION_GROUP_METRIC); SWIFT_REGISTER_GAUGE_METRIC(recycleBlockCacheLatency, PARTITION_GROUP_METRIC); return true; } void RecycleBlockMetrics::report(const MetricsTags *tags, RecycleBlockMetricsCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(recycleObsoleteMetaBlockCount, collector->recycleObsoleteMetaBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleObsoleteDataBlockCount, collector->recycleObsoleteDataBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleObsoleteBlockLatency, collector->recycleObsoleteBlockLatency); SWIFT_REPORT_MUTABLE_METRIC(recycleObsoleteBlockCount, collector->recycleObsoleteBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleByDisMetaBlockCount, collector->recycleByDisMetaBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleByDisDataBlockCount, collector->recycleByDisDataBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleByDisLatency, collector->recycleByDisLatency); SWIFT_REPORT_MUTABLE_METRIC(recycleByDisBlockCount, collector->recycleByDisBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleByDisMetaThreshold, collector->recycleByDisMetaThreshold); SWIFT_REPORT_MUTABLE_METRIC(recycleByDisDataThreshold, collector->recycleByDisDataThreshold); SWIFT_REPORT_MUTABLE_METRIC(recycleBlockCount, collector->recycleBlockCount); SWIFT_REPORT_MUTABLE_METRIC(recycleBlockCacheQps, collector->recycleBlockCacheQps); SWIFT_REPORT_MUTABLE_METRIC(recycleBlockCacheLatency, collector->recycleBlockCacheLatency); } bool AccessInfoMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_QPS_METRIC(requestLength, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(responseLength, BROKER_WORKER_GROUP_METRIC); SWIFT_REGISTER_QPS_METRIC(accessQps, PARTITION_GROUP_METRIC); return true; } void AccessInfoMetrics::report(const MetricsTags *tags, AccessInfoCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(requestLength, collector->requestLength); SWIFT_REPORT_MUTABLE_METRIC(responseLength, collector->responseLength); SWIFT_REPORT_MUTABLE_METRIC(accessQps, 1); } //////partition////// void BrokerMetricsReporter::reportWriteMetricsBackupThread(const WriteMetricsCollectorPtr &collector, const MetricsTagsPtr &tags) { if (collector == nullptr) { return; } if (_threadPool) { auto item = new ReportWriteInfoWorkItem(collector, this, tags); if (autil::ThreadPool::ERROR_NONE != _threadPool->pushWorkItem(item, false)) { item->process(); item->destroy(); } } else { reportWriteMetrics(*collector, tags); } } void BrokerMetricsReporter::reportWriteMetrics(WriteMetricsCollector &collector, const MetricsTagsPtr &tags) { auto tag = tags ? tags : getPartitionInfoMetricsTags(collector.topicName, intToString(collector.partId), collector.accessId); _metricsReporter->report<WriteMetrics, WriteMetricsCollector>(tag.get(), &collector); } void BrokerMetricsReporter::reportReadMetricsBackupThread(const ReadMetricsCollectorPtr &collector, const MetricsTagsPtr &tags) { if (collector == nullptr) { return; } if (_threadPool) { auto item = new ReportReadInfoWorkItem(collector, this, tags); if (autil::ThreadPool::ERROR_NONE != _threadPool->pushWorkItem(item, false)) { item->process(); item->destroy(); } } else { reportReadMetrics(*collector, tags); } } void BrokerMetricsReporter::reportReadMetrics(ReadMetricsCollector &collector, const MetricsTagsPtr &tags) { auto tag = tags ? tags : getPartitionInfoMetricsTags(collector.topicName, intToString(collector.partId), collector.accessId); _metricsReporter->report<ReadMetrics, ReadMetricsCollector>(tag.get(), &collector); } void BrokerMetricsReporter::reportAccessInfoMetricsBackupThread(const AccessInfoCollectorPtr &collector) { if (collector == nullptr) { return; } if (_threadPool) { auto item = new ReportAccessInfoWorkItem(collector, this); if (autil::ThreadPool::ERROR_NONE != _threadPool->pushWorkItem(item, false)) { item->process(); item->destroy(); } } else { reportAccessInfoMetrics(*collector); } } void BrokerMetricsReporter::reportAccessInfoMetrics(AccessInfoCollector &collector) { auto tags = getAccessInfoMetricsTags(collector); _metricsReporter->report<AccessInfoMetrics, AccessInfoCollector>(tags.get(), &collector); } void BrokerMetricsReporter::reportLongPollingMetrics(const MetricsTagsPtr &tags, LongPollingMetricsCollector &collector) { _metricsReporter->report<LongPollingMetrics, LongPollingMetricsCollector>(tags.get(), &collector); } void BrokerMetricsReporter::reportFileManagerMetrics(const MetricsTagsPtr &tags, FileManagerMetricsCollector &collector) { _metricsReporter->report<FileManagerMetrics, FileManagerMetricsCollector>(tags.get(), &collector); } void BrokerMetricsReporter::incGetMinMessageIdByTimeQps(MetricsTags *tags) { static const string name = PARTITION_GROUP_METRIC + "getMinMessageIdByTimeQps"; REPORT_USER_MUTABLE_QPS_TAGS(_metricsReporter, name, tags); } void BrokerMetricsReporter::reportGetMinMessageIdByTimeLatency(double latency, MetricsTags *tags) { static const string name = PARTITION_GROUP_METRIC + "getMinMessageIdByTimeLatency"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, latency, tags); } void BrokerMetricsReporter::reportClientCommitQps(MetricsTags *tags) { static const string name = PARTITION_DFS_GROUP_METRIC + "clientCommitQps"; REPORT_USER_MUTABLE_QPS_TAGS(_metricsReporter, name, tags); } void BrokerMetricsReporter::reportClientCommitDelay(double delay, MetricsTags *tags) { static const string name = PARTITION_DFS_GROUP_METRIC + "clientCommitDelay"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, delay, tags); } void BrokerMetricsReporter::reportRecycleWriteCacheByReaderSize(int64_t recycleSize, MetricsTags *tags) { static const string name = PARTITION_MEMORY_GROUP_METRIC + "partitionRecycleWriteCacheByReaderSize"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, recycleSize, tags); } void BrokerMetricsReporter::reportRecycleWriteCacheByForceSize(int64_t recycleSize, MetricsTags *tags) { static const string name = PARTITION_MEMORY_GROUP_METRIC + "partitionRecycleWriteCacheByForceSize"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, recycleSize, tags); } void BrokerMetricsReporter::incRecycleQps(MetricsTags *tags) { static const string name = PARTITION_MEMORY_GROUP_METRIC + "partitionRecycleQps"; REPORT_USER_MUTABLE_QPS_TAGS(_metricsReporter, name, tags); } void BrokerMetricsReporter::reportLongPollingQps(const kmonitor::MetricsTagsPtr &tags) { static const string name = BROKER_WORKER_GROUP_STATUS_METRIC + "longPollingQps"; REPORT_USER_MUTABLE_QPS_TAGS(_metricsReporter, name, tags.get()); } void BrokerMetricsReporter::reportDeleteObsoleteFileQps(uint32_t deletedFileCount, const kmonitor::MetricsTags *tags) { static const string name = PARTITION_GROUP_METRIC + "deleteObsoleteFileQps"; REPORT_USER_MUTABLE_QPSN_TAGS(_metricsReporter, name, deletedFileCount, tags); } void BrokerMetricsReporter::reportInitFileManagerLatency(int64_t latency, const kmonitor::MetricsTags *tags) { static const string name = PARTITION_GROUP_METRIC + "initFileManagerLatency"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, latency, tags); } /////// worker metrics void BrokerMetricsReporter::reportBrokerSysMetrics(BrokerSysMetricsCollector &collector) { _metricsReporter->report<BrokerSysMetrics, BrokerSysMetricsCollector>(nullptr, &collector); } void BrokerMetricsReporter::reportWorkerMetrics(WorkerMetricsCollector &collector) { _metricsReporter->report<WorkerMetrics, WorkerMetricsCollector>(nullptr, &collector); // partition metrics for (auto &partCollector : collector.partMetricsCollectors) { MetricsTags tags; tags.AddTag("topic", partCollector.topicName); tags.AddTag("partition", intToString(partCollector.partId)); _metricsReporter->report<PartitionMetrics, PartitionMetricsCollector>(&tags, &partCollector); } } void BrokerMetricsReporter::reportBrokerInStatus(const string &broker, BrokerInStatusCollector &collector) { string newName(broker); autil::StringUtil::replace(newName, '#', '_'); MetricsTags tags("broker", newName); _metricsReporter->report<BrokerInStatusMetrics, BrokerInStatusCollector>(&tags, &collector); } void BrokerMetricsReporter::reportWorkerLoadPartitionQps() { static const string name = BROKER_WORKER_GROUP_METRIC + "loadPartitionQps"; REPORT_USER_MUTABLE_QPS(_metricsReporter, name); } void BrokerMetricsReporter::reportWorkerUnloadPartitionQps() { static const string name = BROKER_WORKER_GROUP_METRIC + "unloadPartitionQps"; REPORT_USER_MUTABLE_QPS(_metricsReporter, name); } void BrokerMetricsReporter::reportWorkerLoadPartitionLatency(double latency) { static const string name = BROKER_WORKER_GROUP_METRIC + "loadPartitionLatency"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, latency); } void BrokerMetricsReporter::reportWorkerUnloadPartitionLatency(double latency) { static const string name = BROKER_WORKER_GROUP_METRIC + "unloadPartitionLatency"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, latency); } void BrokerMetricsReporter::reportRecycleWriteCacheBlockCount(int64_t count) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleWriteCacheBlockCount"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, count); } void BrokerMetricsReporter::reportRecycleWriteCacheLatency(int64_t latency) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleWriteCacheLatency"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, latency); } void BrokerMetricsReporter::reportRecycleFileCacheBlockCount(int64_t count) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleFileCacheBlockCount"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, count); } void BrokerMetricsReporter::reportRecycleFileCacheLatency(int64_t latency) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleFileCacheLatency"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, latency); } void BrokerMetricsReporter::reportRecycleObsoleteReaderLatency(int64_t latency) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleObsoleteReaderLatency"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, latency); } void BrokerMetricsReporter::reportRecycleByDisMetaThreshold(int64_t threshold) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleByDisMetaThreshold"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, threshold); } void BrokerMetricsReporter::reportRecycleByDisDataThreshold(int64_t threshold) { static const string name = BROKER_WORKER_GROUP_METRIC + "recycleByDisDataThreshold"; REPORT_USER_MUTABLE_METRIC(_metricsReporter, name, threshold); } /////dfs void BrokerMetricsReporter::reportCommitFileMetrics(CommitFileMetricsCollector &collector) { MetricsTags tags; tags.AddTag("topic", collector.topicName); tags.AddTag("partition", intToString(collector.partId)); _metricsReporter->report<CommitFileMetrics, CommitFileMetricsCollector>(&tags, &collector); } void BrokerMetricsReporter::reportDfsWriteDataLatency(double latency, MetricsTags *tags) { static const string name = PARTITION_DFS_GROUP_METRIC + "dfsWriteDataLatency"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, latency, tags); } void BrokerMetricsReporter::reportDfsWriteMetaLatency(double latency, MetricsTags *tags) { static const string name = PARTITION_DFS_GROUP_METRIC + "dfsWriteMetaLatency"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, latency, tags); } void BrokerMetricsReporter::reportDFSWriteRate(uint64_t size, MetricsTags *tags) { static const string name = PARTITION_DFS_GROUP_METRIC + "dfsWriteRate"; REPORT_USER_MUTABLE_QPSN_TAGS(_metricsReporter, name, size, tags); } //////cache void BrokerMetricsReporter::reportRecycleBlockMetrics(RecycleBlockMetricsCollector &collector) { MetricsTags tags; tags.AddTag("topic", collector.topicName); tags.AddTag("partition", intToString(collector.partId)); _metricsReporter->report<RecycleBlockMetrics, RecycleBlockMetricsCollector>(&tags, &collector); } void BrokerMetricsReporter::incRecycleFileCacheQps(MetricsTags *tags) { static const string name = PARTITION_GROUP_METRIC + "recycleFileCacheQps"; REPORT_USER_MUTABLE_QPS_TAGS(_metricsReporter, name, tags); } void BrokerMetricsReporter::reportDfsReadOneBlockLatency(double latency, MetricsTags *tags) { static const string name = PARTITION_GROUP_METRIC + "dfsReadOneBlockLatency"; REPORT_USER_MUTABLE_METRIC_TAGS(_metricsReporter, name, latency, tags); } MetricsTagsPtr BrokerMetricsReporter::getAccessInfoMetricsTags(AccessInfoCollector &collector) { string key; key.reserve(256); key.append(collector.topicName); key.append(intToString(collector.partId)); key.append(collector.clientVersion); key.append(collector.clientType); key.append(collector.srcDrc); key.append(collector.accessType); uint32_t hashKey = autil::HashAlgorithm::hashString(key.c_str(), key.size(), 0); { ScopedReadLock lock(_accessInfoMapMutex); auto iter = _accessInfoTagsMap.find(hashKey); if (iter != _accessInfoTagsMap.end()) { return iter->second; } } MetricsTagsPtr tags(new MetricsTags({{"topic", collector.topicName}, {"partition", intToString(collector.partId)}, {"clientVersion", collector.clientVersion}, {"clientType", collector.clientType}, {"srcDrc", collector.srcDrc}, {"type", collector.accessType}})); { ScopedWriteLock lock(_accessInfoMapMutex); if (_accessInfoTagsMap.size() > CLEAR_TAGS_MAP_THRESHOLD) { _accessInfoTagsMap.clear(); } _accessInfoTagsMap[hashKey] = tags; } return tags; } kmonitor::MetricsTagsPtr BrokerMetricsReporter::getPartitionInfoMetricsTags(const std::string &topicName, const std::string &partId, const std::string &accessId) { string key; key.reserve(256); key.append(topicName); key.append(partId); key.append(accessId); uint32_t hashKey = autil::HashAlgorithm::hashString(key.c_str(), key.size(), 0); { ScopedReadLock lock(_partitionInfoMapMutex); auto iter = _partitionInfoTagsMap.find(hashKey); if (iter != _partitionInfoTagsMap.end()) { return iter->second; } } MetricsTagsPtr tags(new MetricsTags({{"topic", topicName}, {"partition", partId}, {"access_id", accessId}})); { ScopedWriteLock lock(_partitionInfoMapMutex); _partitionInfoTagsMap[hashKey] = tags; } return tags; } bool FileManagerMetrics::init(MetricsGroupManager *manager) { SWIFT_REGISTER_GAUGE_METRIC(getPathMetaCount, BROKER_WORKER_GROUP_STATUS_METRIC); SWIFT_REGISTER_GAUGE_METRIC(getPathMetaLatency, BROKER_WORKER_GROUP_STATUS_METRIC); return true; } void FileManagerMetrics::report(const MetricsTags *tags, FileManagerMetricsCollector *collector) { SWIFT_REPORT_MUTABLE_METRIC(getPathMetaCount, collector->getPathMetaCount); SWIFT_REPORT_MUTABLE_METRIC(getPathMetaLatency, collector->getPathMetaLatency); } } // namespace monitor } // namespace swift