static nonstd::expected parse()

in extensions/elasticsearch/PostElasticsearch.cpp [90:125]


  static nonstd::expected<ElasticPayload, std::string> parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
    auto action = context.getProperty(PostElasticsearch::Action, flow_file.get());
    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
      return nonstd::make_unexpected("Missing or invalid action");

    auto index = context.getProperty(PostElasticsearch::Index, flow_file.get());
    if (!index)
      return nonstd::make_unexpected("Missing index");

    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file.get()) | utils::toOptional();
    if (!id && (action == "delete" || action == "update" || action == "upsert"))
      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");

    std::optional<rapidjson::Document> payload;
    if (action == "index" || action == "create") {
      payload = rapidjson::Document(rapidjson::kObjectType);
      utils::JsonInputCallback callback(*payload);
      if (session.read(flow_file, std::ref(callback)) < 0) {
        return nonstd::make_unexpected("invalid flowfile content");
      }
    }
    if (action == "update" || action == "upsert") {
      payload = rapidjson::Document(rapidjson::kObjectType);
      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
      utils::JsonInputCallback callback(doc_member);
      if (session.read(flow_file, std::ref(callback)) < 0) {
        return nonstd::make_unexpected("invalid flowfile content");
      }
      if (action == "upsert") {
        action = "update";
        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
      }
      payload->AddMember("doc", doc_member, payload->GetAllocator());
    }
    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
  }