in libminifi/src/provenance/Provenance.cpp [244:440]
bool ProvenanceEventRecord::deserialize(io::InputStream &input_stream) {
{
const auto ret = input_stream.read(uuid_);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
uint32_t eventType;
{
const auto ret = input_stream.read(eventType);
if (ret != 4) {
return false;
}
}
this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
{
uint64_t event_time_in_ms;
const auto ret = input_stream.read(event_time_in_ms);
if (ret != 8) {
return false;
}
_eventTime = std::chrono::system_clock::time_point() + std::chrono::milliseconds(event_time_in_ms);
}
{
uint64_t entry_date_in_ms;
const auto ret = input_stream.read(entry_date_in_ms);
if (ret != 8) {
return false;
}
_entryDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(entry_date_in_ms);
}
{
uint64_t event_duration_ms;
const auto ret = input_stream.read(event_duration_ms);
if (ret != 8) {
return false;
}
_eventDuration = std::chrono::milliseconds(event_duration_ms);
}
{
uint64_t lineage_start_date_in_ms;
const auto ret = input_stream.read(lineage_start_date_in_ms);
if (ret != 8) {
return false;
}
_lineageStartDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(lineage_start_date_in_ms);
}
{
const auto ret = input_stream.read(this->_componentId);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_componentType);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->flow_uuid_);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_details);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
// read flow attributes
uint32_t numAttributes = 0;
{
const auto ret = input_stream.read(numAttributes);
if (ret != 4) {
return false;
}
}
for (uint32_t i = 0; i < numAttributes; i++) {
std::string key;
{
const auto ret = input_stream.read(key);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
std::string value;
{
const auto ret = input_stream.read(value);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
this->_attributes[key] = value;
}
{
const auto ret = input_stream.read(this->_contentFullPath);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_size);
if (ret != 8) {
return false;
}
}
{
const auto ret = input_stream.read(this->_offset);
if (ret != 8) {
return false;
}
}
{
const auto ret = input_stream.read(this->_sourceQueueIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
// read UUIDs
uint32_t number = 0;
{
const auto ret = input_stream.read(number);
if (ret != 4) {
return false;
}
}
for (uint32_t i = 0; i < number; i++) {
utils::Identifier parentUUID;
{
const auto ret = input_stream.read(parentUUID);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
this->addParentUuid(parentUUID);
}
number = 0;
{
const auto ret = input_stream.read(number);
if (ret != 4) {
return false;
}
}
for (uint32_t i = 0; i < number; i++) {
utils::Identifier childUUID;
{
const auto ret = input_stream.read(childUUID);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
this->addChildUuid(childUUID);
}
} else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
{
const auto ret = input_stream.read(this->_transitUri);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
{
const auto ret = input_stream.read(this->_transitUri);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_sourceSystemFlowFileIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
}
return true;
}