cpp/writer/PacketReassembler.cpp (101 lines of code) (raw):
/**
* Copyright 2004-present, Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PacketReassembler.h"
#include <algorithm>
namespace facebook {
namespace profilo {
namespace writer {
using detail::PacketStream;
PacketReassembler::PacketReassembler(
PacketReassembler::PayloadCallback callback)
: active_streams_(),
pooled_streams_(kStreamPoolSize),
callback_(std::move(callback)) {}
namespace {
inline void appendToStream(PacketStream& stream, Packet const& packet) {
auto prev_size = stream.data.size();
auto next_size = prev_size + packet.size;
stream.data.resize(next_size);
char* data = static_cast<char*>(stream.data.data());
std::memcpy(data + prev_size, packet.data, packet.size);
}
inline void appendToStreamReverse(PacketStream& stream, Packet const& packet) {
auto prev_size = stream.data.size();
auto next_size = prev_size + packet.size;
stream.data.resize(next_size);
char* data = static_cast<char*>(stream.data.data());
std::memcpy(data + prev_size, packet.data, packet.size);
std::reverse(data + prev_size, data + prev_size + packet.size);
}
} // anonymous namespace
PacketStream PacketReassembler::newStream() {
if (!pooled_streams_.empty()) {
// Take a pooled stream and remove the moved-from instance.
PacketStream stream = std::move(*pooled_streams_.begin());
pooled_streams_.pop_front();
return stream;
} else {
// No pooled stream to use, make a new one.
return PacketStream{};
}
}
void PacketReassembler::recycleStream(PacketStream stream) {
// Return to pool, if necessary. Otherwise, release via RAII.
if (pooled_streams_.size() < kStreamPoolSize) {
// Changes the `size` to 0 but the `capacity` will not be affected.
stream.data.resize(0);
pooled_streams_.push_back(std::move(stream));
}
}
void PacketReassembler::process(Packet const& packet) {
//
// Collect packets into active_streams_, inside PacketStream objects.
//
// Last packet within the stream flushes to the callback.
// The PacketStream object can then be moved to pooled_stream_ for reuse.
//
// Is this part of an existing stream?
if (active_streams_.size() > 0) {
for (auto it = active_streams_.begin(), end = active_streams_.end();
it != end;
++it) {
auto& stream = *it;
if (stream.stream == packet.stream) {
appendToStream(stream, packet);
if (!packet.next) {
// Flush the stream
callback_(stream.data.data(), stream.data.size());
PacketStream temp_stream = std::move(*it);
active_streams_.erase(it); // remove moved-from instance
recycleStream(std::move(temp_stream));
}
return; // packet is handled
}
}
}
if (packet.start && !packet.next) {
callback_(packet.data, packet.size);
} else if (packet.start) { // Ignore if we only started from the middle of the
// packet
PacketStream stream = newStream();
stream.stream = packet.stream;
appendToStream(stream, packet);
active_streams_.push_front(std::move(stream));
}
}
void PacketReassembler::processBackwards(Packet const& packet) {
//
// Collect packets into active_streams_, inside PacketStream objects.
//
// Last packet within the stream flushes to the callback.
// The PacketStream object can then be moved to pooled_stream_ for reuse.
//
// Is this part of an existing stream?
if (active_streams_.size() > 0) {
for (auto it = active_streams_.begin(), end = active_streams_.end();
it != end;
++it) {
auto& stream = *it;
if (stream.stream == packet.stream) {
appendToStreamReverse(stream, packet);
if (packet.start) {
// Flush the stream
std::reverse(stream.data.begin(), stream.data.end());
callback_(stream.data.data(), stream.data.size());
PacketStream temp_stream = std::move(*it);
active_streams_.erase(it); // remove moved-from instance
recycleStream(std::move(temp_stream));
}
return; // packet is handled
}
}
}
if (packet.start && !packet.next) {
callback_(packet.data, packet.size);
} else if (!packet.next) { // Ignore if we only started from the middle of the
// packet
PacketStream stream = newStream();
stream.stream = packet.stream;
appendToStreamReverse(stream, packet);
active_streams_.push_front(std::move(stream));
}
}
} // namespace writer
} // namespace profilo
} // namespace facebook