libminifi/include/core/FlowFile.h (122 lines of code) (raw):
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <unordered_set>
#include <string>
#include <utility>
#include <vector>
#include "utils/TimeUtil.h"
#include "ResourceClaim.h"
#include "Connectable.h"
#include "WeakReference.h"
#include "utils/FlatMap.h"
#include "utils/Export.h"
namespace org::apache::nifi::minifi::core {
class Connectable;
class FlowFile : public CoreComponent, public ReferenceContainer {
 public:
  FlowFile();
  FlowFile& operator=(const FlowFile& other);
  using AttributeMap = utils::FlatMap<std::string, std::string>;
  /**
   * Returns a pointer to this flow file record's
   * claim
   */
  [[nodiscard]] std::shared_ptr<ResourceClaim> getResourceClaim() const;
  /**
   * Sets _claim to the inbound claim argument
   */
  void setResourceClaim(const std::shared_ptr<ResourceClaim>& claim);
  /**
   * clear the resource claim
   */
  void clearResourceClaim();
  /**
   * Returns a pointer to this flow file record's
   * claim at the given stash key
   */
  std::shared_ptr<ResourceClaim> getStashClaim(const std::string& key);
  /**
   * Sets the given stash key to the inbound claim argument
   */
  void setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim);
  /**
   * Clear the resource claim at the given stash key
   */
  void clearStashClaim(const std::string& key);
  /**
   * Return true if the given stash claim exists
   */
  bool hasStashClaim(const std::string& key);
  /**
   * Get lineage identifiers
   */
  std::vector<utils::Identifier> &getlineageIdentifiers();
  /**
   * Returns whether or not this flow file record
   * is marked as deleted.
   * @return marked deleted
   */
  [[nodiscard]] bool isDeleted() const;
  /**
   * Sets whether to mark this flow file record
   * as deleted
   * @param deleted deleted flag
   */
  void setDeleted(bool deleted);
  /**
   * Get entry date for this record
   * @return entry date uint64_t
   */
  [[nodiscard]] std::chrono::system_clock::time_point getEntryDate() const;
  /**
   * Gets the event time.
   * @return event time.
   */
  [[nodiscard]] std::chrono::system_clock::time_point getEventTime() const;
  /**
   * Get lineage start date
   * @return lineage start date uint64_t
   */
  [[nodiscard]] std::chrono::system_clock::time_point getlineageStartDate() const;
  /**
   * Sets the lineage start date
   * @param date new lineage start date
   */
  void setLineageStartDate(std::chrono::system_clock::time_point date);
  void setLineageIdentifiers(const std::vector<utils::Identifier>& lineage_Identifiers) {
    lineage_Identifiers_ = lineage_Identifiers;
  }
  /**
   * Obtains an attribute if it exists. If it does the value is
   * copied into value
   * @param key key to look for
   * @param value value to set
   * @return result of finding key
   */
  bool getAttribute(std::string_view key, std::string& value) const;
  [[nodiscard]] std::optional<std::string> getAttribute(std::string_view key) const;
  /**
   * Updates the value in the attribute map that corresponds
   * to key
   * @param key attribute name
   * @param value value to set to attribute name
   * @return result of finding key
   */
  bool updateAttribute(std::string_view key, const std::string& value);
  /**
   * Removes the attribute
   * @param key attribute name to remove
   * @return result of finding key
   */
  bool removeAttribute(std::string_view key);
  /**
   * setAttribute, if attribute already there, update it, else, add it
   */
  bool setAttribute(std::string_view key, std::string value) {
    return attributes_.insert_or_assign(std::string{key}, std::move(value)).second;
  }
  /**
   * Returns the map of attributes
   * @return attributes.
   */
  [[nodiscard]] std::map<std::string, std::string> getAttributes() const {
    return {attributes_.begin(), attributes_.end()};
  }
  /**
   * Returns the map of attributes
   * @return attributes.
   */
  AttributeMap *getAttributesPtr() {
    return &attributes_;
  }
  /**
   * adds an attribute if it does not exist
   *
   */
  bool addAttribute(std::string_view key, const std::string& value);
  /**
   * Set the size of this record.
   * @param size size of record to set.
   */
  void setSize(const uint64_t size) {
    size_ = size;
  }
  /**
   * Returns the size of corresponding flow file
   * @return size as a uint64_t
   */
  [[nodiscard]] uint64_t getSize() const;
  /**
   * Sets the offset
   * @param offset offset to apply to this record.
   */
  void setOffset(const uint64_t offset) {
    offset_ = offset;
  }
  template<typename Rep, typename Period>
  void penalize(std::chrono::duration<Rep, Period> duration) {
    to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
  }
  [[nodiscard]] std::chrono::steady_clock::time_point getPenaltyExpiration() const {
    return to_be_processed_after_;
  }
  void setPenaltyExpiration(std::chrono::time_point<std::chrono::steady_clock> to_be_processed_after) {
    to_be_processed_after_ = to_be_processed_after;
  }
  /**
   * Gets the offset within the flow file
   * @return size as a uint64_t
   */
  [[nodiscard]] uint64_t getOffset() const;
  [[nodiscard]] bool isPenalized() const {
    return to_be_processed_after_ > std::chrono::steady_clock::now();
  }
  [[nodiscard]] uint64_t getId() const {
    return id_;
  }
  /**
   * Sets the original connection with a shared pointer.
   * @param connection shared connection.
   */
  void setConnection(core::Connectable* connection);
  /**
   * Returns the original connection referenced by this record.
   * @return shared original connection pointer.
   */
  [[nodiscard]] Connectable* getConnection() const;
  void setStoredToRepository(bool storedInRepository) {
    stored = storedInRepository;
  }
  [[nodiscard]] bool isStored() const {
    return stored;
  }
 protected:
  bool stored;
  // Mark for deletion
  bool marked_delete_;
  // Date at which the flow file entered the flow
  std::chrono::system_clock::time_point entry_date_{};
  // event time
  std::chrono::system_clock::time_point event_time_{};
  // Date at which the origin of this flow file entered the flow
  std::chrono::system_clock::time_point lineage_start_date_{};
  // Date at which the flow file was queued
  uint64_t last_queue_date_;
  // Size in bytes of the data corresponding to this flow file
  uint64_t size_;
  // A global unique identifier
  // A local unique identifier
  uint64_t id_;
  // Offset to the content
  uint64_t offset_;
  // Penalty expiration
  std::chrono::steady_clock::time_point to_be_processed_after_;
  // Attributes key/values pairs for the flow record
  AttributeMap attributes_;
  // Pointer to the associated content resource claim
  std::shared_ptr<ResourceClaim> claim_;
  // Pointers to stashed content resource claims
  utils::FlatMap<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
  // UUID string
  // std::string uuid_str_;
  // UUID string for all parents
  std::vector<utils::Identifier> lineage_Identifiers_;
  // Orginal connection queue that this flow file was dequeued from
  core::Connectable* connection_ = nullptr;
  static std::shared_ptr<logging::Logger> logger_;
  static std::shared_ptr<utils::IdGenerator> id_generator_;
  static std::shared_ptr<utils::NonRepeatingStringGenerator> numeric_id_generator_;
};
// FlowFile Attribute
struct SpecialFlowAttribute {
  // The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
  MINIFIAPI static constexpr std::string_view PATH = "path";
  // The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
  MINIFIAPI static constexpr std::string_view ABSOLUTE_PATH = "absolute.path";
  // The filename of the FlowFile. The filename should not contain any directory structure.
  MINIFIAPI static constexpr std::string_view FILENAME = "filename";
  // A unique UUID assigned to this FlowFile.
  MINIFIAPI static constexpr std::string_view UUID = "uuid";
  // A numeric value indicating the FlowFile priority
  MINIFIAPI static constexpr std::string_view priority = "priority";
  // The MIME Type of this FlowFile
  MINIFIAPI static constexpr std::string_view MIME_TYPE = "mime.type";
  // Specifies the reason that a FlowFile is being discarded
  MINIFIAPI static constexpr std::string_view DISCARD_REASON = "discard.reason";
  // Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
  MINIFIAPI static constexpr std::string_view ALTERNATE_IDENTIFIER = "alternate.identifier";
  // Flow identifier
  MINIFIAPI static constexpr std::string_view FLOW_ID = "flow.id";
  static constexpr std::array<std::string_view, 9> getSpecialFlowAttributes() {
    return {
        PATH, ABSOLUTE_PATH, FILENAME, UUID, priority, MIME_TYPE, DISCARD_REASON, ALTERNATE_IDENTIFIER, FLOW_ID
    };
  }
};
}  // namespace org::apache::nifi::minifi::core