aios/suez/table/wal/QueueWAL.cpp (59 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "suez/table/wal/QueueWAL.h"
#include "autil/LockFreeQueue.h"
#include "autil/Log.h"
#include "autil/result/Errors.h"
#include "build_service/common_define.h"
#include "suez/table/wal/WALConfig.h"
using namespace std;
using namespace autil;
using namespace autil::result;
AUTIL_DECLARE_AND_SETUP_LOGGER(suez, QueueWAL);
namespace suez {
const string QueueWAL::QUEUE_NAME = "queue_name";
const string QueueWAL::QUEUE_SIZE = "queue_size";
QueueWAL::QueueWAL() {}
QueueWAL::~QueueWAL() {}
bool QueueWAL::init(const WALConfig &config) {
const auto &kvMap = config.sinkDescription;
string queueName = build_service::getValueFromKeyValueMap(kvMap, QUEUE_NAME);
if (queueName.empty()) {
AUTIL_LOG(ERROR, "queue name is empty for queue_wal.");
return false;
}
_queueName = GlobalQueueManager::generateQueueName(queueName,
config.range.first, config.range.second);
_docQueuePtr = GlobalQueueManager::getInstance()->createQueue(_queueName);
if (_docQueuePtr == nullptr) {
AUTIL_LOG(ERROR, "get queue failed, queue_name[%s]", _queueName.c_str());
return false;
}
string maxQueueSizeStr = build_service::getValueFromKeyValueMap(kvMap, QUEUE_SIZE);
uint32_t maxQueueSize = 1000;
if (!maxQueueSizeStr.empty() && StringUtil::fromString(maxQueueSizeStr, maxQueueSize)) {
_maxQueueSize = maxQueueSize;
}
return true;
}
void QueueWAL::log(const vector<std::pair<uint16_t, std::string>> &strs, CallbackType done) {
if (_docQueuePtr == nullptr) {
done(RuntimeError::make("doc queue is null"));
return;
}
if ((strs.size() + _docQueuePtr->Size()) >= _maxQueueSize) {
done(RuntimeError::make("doc queue is full"));
return;
}
vector<int64_t> timestamps;
for (const auto &str : strs) {
int64_t timestamp = TimeUtility::currentTime();
RawDoc rawDoc = std::make_pair(timestamp, str.second);
_docQueuePtr->Push(rawDoc);
timestamps.push_back(timestamp);
}
done(std::move(timestamps));
}
void QueueWAL::stop() {
_docQueuePtr.reset();
GlobalQueueManager::getInstance()->releaseQueue(_queueName);
}
} // namespace suez