in extensions/elasticsearch/PostElasticsearch.cpp [90:125]
static nonstd::expected<ElasticPayload, std::string> parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
auto action = context.getProperty(PostElasticsearch::Action, flow_file.get());
if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
return nonstd::make_unexpected("Missing or invalid action");
auto index = context.getProperty(PostElasticsearch::Index, flow_file.get());
if (!index)
return nonstd::make_unexpected("Missing index");
auto id = context.getProperty(PostElasticsearch::Identifier, flow_file.get()) | utils::toOptional();
if (!id && (action == "delete" || action == "update" || action == "upsert"))
return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
std::optional<rapidjson::Document> payload;
if (action == "index" || action == "create") {
payload = rapidjson::Document(rapidjson::kObjectType);
utils::JsonInputCallback callback(*payload);
if (session.read(flow_file, std::ref(callback)) < 0) {
return nonstd::make_unexpected("invalid flowfile content");
}
}
if (action == "update" || action == "upsert") {
payload = rapidjson::Document(rapidjson::kObjectType);
rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
utils::JsonInputCallback callback(doc_member);
if (session.read(flow_file, std::ref(callback)) < 0) {
return nonstd::make_unexpected("invalid flowfile content");
}
if (action == "upsert") {
action = "update";
doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
}
payload->AddMember("doc", doc_member, payload->GetAllocator());
}
return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
}