libminifi/include/core/FlowFile.h (122 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <unordered_set>
#include <string>
#include <utility>
#include <vector>
#include "utils/TimeUtil.h"
#include "ResourceClaim.h"
#include "Connectable.h"
#include "WeakReference.h"
#include "utils/FlatMap.h"
#include "utils/Export.h"
namespace org::apache::nifi::minifi::core {
class Connectable;
class FlowFile : public CoreComponent, public ReferenceContainer {
public:
FlowFile();
FlowFile& operator=(const FlowFile& other);
using AttributeMap = utils::FlatMap<std::string, std::string>;
/**
* Returns a pointer to this flow file record's
* claim
*/
[[nodiscard]] std::shared_ptr<ResourceClaim> getResourceClaim() const;
/**
* Sets _claim to the inbound claim argument
*/
void setResourceClaim(const std::shared_ptr<ResourceClaim>& claim);
/**
* clear the resource claim
*/
void clearResourceClaim();
/**
* Returns a pointer to this flow file record's
* claim at the given stash key
*/
std::shared_ptr<ResourceClaim> getStashClaim(const std::string& key);
/**
* Sets the given stash key to the inbound claim argument
*/
void setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim);
/**
* Clear the resource claim at the given stash key
*/
void clearStashClaim(const std::string& key);
/**
* Return true if the given stash claim exists
*/
bool hasStashClaim(const std::string& key);
/**
* Get lineage identifiers
*/
std::vector<utils::Identifier> &getlineageIdentifiers();
/**
* Returns whether or not this flow file record
* is marked as deleted.
* @return marked deleted
*/
[[nodiscard]] bool isDeleted() const;
/**
* Sets whether to mark this flow file record
* as deleted
* @param deleted deleted flag
*/
void setDeleted(bool deleted);
/**
* Get entry date for this record
* @return entry date uint64_t
*/
[[nodiscard]] std::chrono::system_clock::time_point getEntryDate() const;
/**
* Gets the event time.
* @return event time.
*/
[[nodiscard]] std::chrono::system_clock::time_point getEventTime() const;
/**
* Get lineage start date
* @return lineage start date uint64_t
*/
[[nodiscard]] std::chrono::system_clock::time_point getlineageStartDate() const;
/**
* Sets the lineage start date
* @param date new lineage start date
*/
void setLineageStartDate(std::chrono::system_clock::time_point date);
void setLineageIdentifiers(const std::vector<utils::Identifier>& lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
}
/**
* Obtains an attribute if it exists. If it does the value is
* copied into value
* @param key key to look for
* @param value value to set
* @return result of finding key
*/
bool getAttribute(std::string_view key, std::string& value) const;
[[nodiscard]] std::optional<std::string> getAttribute(std::string_view key) const;
/**
* Updates the value in the attribute map that corresponds
* to key
* @param key attribute name
* @param value value to set to attribute name
* @return result of finding key
*/
bool updateAttribute(std::string_view key, const std::string& value);
/**
* Removes the attribute
* @param key attribute name to remove
* @return result of finding key
*/
bool removeAttribute(std::string_view key);
/**
* setAttribute, if attribute already there, update it, else, add it
*/
bool setAttribute(std::string_view key, std::string value) {
return attributes_.insert_or_assign(std::string{key}, std::move(value)).second;
}
/**
* Returns the map of attributes
* @return attributes.
*/
[[nodiscard]] std::map<std::string, std::string> getAttributes() const {
return {attributes_.begin(), attributes_.end()};
}
/**
* Returns the map of attributes
* @return attributes.
*/
AttributeMap *getAttributesPtr() {
return &attributes_;
}
/**
* adds an attribute if it does not exist
*
*/
bool addAttribute(std::string_view key, const std::string& value);
/**
* Set the size of this record.
* @param size size of record to set.
*/
void setSize(const uint64_t size) {
size_ = size;
}
/**
* Returns the size of corresponding flow file
* @return size as a uint64_t
*/
[[nodiscard]] uint64_t getSize() const;
/**
* Sets the offset
* @param offset offset to apply to this record.
*/
void setOffset(const uint64_t offset) {
offset_ = offset;
}
template<typename Rep, typename Period>
void penalize(std::chrono::duration<Rep, Period> duration) {
to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
}
[[nodiscard]] std::chrono::steady_clock::time_point getPenaltyExpiration() const {
return to_be_processed_after_;
}
void setPenaltyExpiration(std::chrono::time_point<std::chrono::steady_clock> to_be_processed_after) {
to_be_processed_after_ = to_be_processed_after;
}
/**
* Gets the offset within the flow file
* @return size as a uint64_t
*/
[[nodiscard]] uint64_t getOffset() const;
[[nodiscard]] bool isPenalized() const {
return to_be_processed_after_ > std::chrono::steady_clock::now();
}
[[nodiscard]] uint64_t getId() const {
return id_;
}
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
void setConnection(core::Connectable* connection);
/**
* Returns the original connection referenced by this record.
* @return shared original connection pointer.
*/
[[nodiscard]] Connectable* getConnection() const;
void setStoredToRepository(bool storedInRepository) {
stored = storedInRepository;
}
[[nodiscard]] bool isStored() const {
return stored;
}
protected:
bool stored;
// Mark for deletion
bool marked_delete_;
// Date at which the flow file entered the flow
std::chrono::system_clock::time_point entry_date_{};
// event time
std::chrono::system_clock::time_point event_time_{};
// Date at which the origin of this flow file entered the flow
std::chrono::system_clock::time_point lineage_start_date_{};
// Date at which the flow file was queued
uint64_t last_queue_date_;
// Size in bytes of the data corresponding to this flow file
uint64_t size_;
// A global unique identifier
// A local unique identifier
uint64_t id_;
// Offset to the content
uint64_t offset_;
// Penalty expiration
std::chrono::steady_clock::time_point to_be_processed_after_;
// Attributes key/values pairs for the flow record
AttributeMap attributes_;
// Pointer to the associated content resource claim
std::shared_ptr<ResourceClaim> claim_;
// Pointers to stashed content resource claims
utils::FlatMap<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
// UUID string
// std::string uuid_str_;
// UUID string for all parents
std::vector<utils::Identifier> lineage_Identifiers_;
// Orginal connection queue that this flow file was dequeued from
core::Connectable* connection_ = nullptr;
static std::shared_ptr<logging::Logger> logger_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
static std::shared_ptr<utils::NonRepeatingStringGenerator> numeric_id_generator_;
};
// FlowFile Attribute
struct SpecialFlowAttribute {
// The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
MINIFIAPI static constexpr std::string_view PATH = "path";
// The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
MINIFIAPI static constexpr std::string_view ABSOLUTE_PATH = "absolute.path";
// The filename of the FlowFile. The filename should not contain any directory structure.
MINIFIAPI static constexpr std::string_view FILENAME = "filename";
// A unique UUID assigned to this FlowFile.
MINIFIAPI static constexpr std::string_view UUID = "uuid";
// A numeric value indicating the FlowFile priority
MINIFIAPI static constexpr std::string_view priority = "priority";
// The MIME Type of this FlowFile
MINIFIAPI static constexpr std::string_view MIME_TYPE = "mime.type";
// Specifies the reason that a FlowFile is being discarded
MINIFIAPI static constexpr std::string_view DISCARD_REASON = "discard.reason";
// Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
MINIFIAPI static constexpr std::string_view ALTERNATE_IDENTIFIER = "alternate.identifier";
// Flow identifier
MINIFIAPI static constexpr std::string_view FLOW_ID = "flow.id";
static constexpr std::array<std::string_view, 9> getSpecialFlowAttributes() {
return {
PATH, ABSOLUTE_PATH, FILENAME, UUID, priority, MIME_TYPE, DISCARD_REASON, ALTERNATE_IDENTIFIER, FLOW_ID
};
}
};
} // namespace org::apache::nifi::minifi::core