site/api/lib/elastic.lua (200 lines of code) (raw):

--[[ Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ]]-- -- This is elastic.lua - ElasticSearch library local http = require 'socket.http' local JSON = require 'cjson' local config = require 'lib/config' local mime = require 'mime' local default_doc = "mbox" -- http code return check -- N.B. if the index is closed, ES returns 403, but that may perhaps be true for other conditions -- ES returns 404 if the index is missing -- ES also returns 404 if a document is missing local function checkReturn(code, ok404) if type(code) == "number" then -- we have a valid HTTP status code -- ignore expected return codes here -- index returns 201 when an entry is created if code ~= 200 and code ~= 201 and not (ok404 and code == 404) then -- code is called by 2nd-level functions only, so level 4 is the external caller error("Backend Database returned code " .. code .. "!", 4) end else error("Could not contact database backend: " .. code .. "!", 4) end end -- DO common request processing: -- Encode JSON (as necessary) -- Issue request -- Check return code -- Decode JSON response -- -- Parameters: -- - url (required) -- - query (optional); if this is a table it is decoded into JSON -- - ok404 (optional); if true, then 404 is allowed as a status return -- returns decoded JSON result -- may throw an error if the request fails -- Returns: -- json, status code (i.e. 200,201 or 404) local function performRequest(url, query, ok404) local js = query if type(query) == "table" then js = JSON.encode(query) end local result, hc = http.request(url, js) checkReturn(hc, ok404) local json = JSON.decode(result) return json, hc end -- Simple ES delete request -- returns status code only local function performDelete(url, ok404) local _, hc = http.request{ url = url, method = 'DELETE' } checkReturn(hc, ok404) return hc end -- Standard ES query, returns $size results of any doc of type $doc, sorting by $sitem local function getHits(query, size, doc, sitem) doc = doc or "mbox" sitem = sitem or "epoch" size = size or 10 query = query:gsub(" ", "+") local url = config.es_url .. doc .. "/_search?q="..query.."&sort=" .. sitem .. ":desc&size=" .. size local json = performRequest(url) local out = {} if json and json.hits and json.hits.hits then local hasBody = (doc == "mbox") for k, v in pairs(json.hits.hits) do v._source.request_id = v._id if hasBody and v._source.body == JSON.null then v._source.body = '' end table.insert(out, v._source) end end return out end -- Get a single document local function getDoc (ty, id, ok404) local url = config.es_url .. ty .. "/" .. id local json, status = performRequest(url, nil, ok404) if json and json._source then json._source.request_id = json._id if ty == "mbox" and json._source.body == JSON.null then json._source.body = '' end if ty == "mbox_source" then local src = json._source.source -- could it be base64 encoded? -- Unencoded source must contain at least one space; b64 does not if #src % 4 == 0 and src:find(' ') == nil then src = (mime.unb64(src)) if src ~= nil then json._source.source = src end end end end return (json and json._source) and json._source or {}, status end -- Get results (a'la getHits), but only return email headers, not the body -- provides faster transport when we don't need everything local function getHeaders(query, size, doc) doc = doc or "mbox" size = size or 10 query = query:gsub(" ", "+") local url = config.es_url .. doc .. "/_search?_source_exclude=body&q="..query.."&sort=epoch:desc&size=" .. size local json = performRequest(url) local out = {} if json and json.hits and json.hits.hits then for k, v in pairs(json.hits.hits) do v._source.request_id = v._id table.insert(out, v._source) end end return out end -- Same as above, but reverse return order local function getHeadersReverse(query, size, doc) doc = doc or "mbox" size = size or 10 query = query:gsub(" ", "+") local url = config.es_url .. doc .. "/_search?_source_exclude=body&q="..query.."&sort=epoch:desc&size=" .. size local json = performRequest(url) local out = {} if json and json.hits and json.hits.hits then for k, v in pairs(json.hits.hits) do v._source.request_id = v._id table.insert(out, 1, v._source) end end return out end local function contains(table,value) if table then for _,v in pairs(table) do if v == value then return true end end end return false end -- Do a raw ES query with a JSON query local function raw(query, doctype) doctype = doctype or default_doc local url = config.es_url .. doctype .. "/_search" local json = performRequest(url, query) if doctype == "mbox" and json and json.hits and json.hits.hits then -- Check if the query returns the body attribute if contains(query._source, 'body') then local dhh = json.hits.hits for k = 1, #dhh do local v = dhh[k]._source if v.body == JSON.null then v.body = '' end end end end return json or {}, url end -- communicate between scroll calls local queryHasBody = {} --[[ Raw query with scroll Parameters: sidOrQuery - if table, then this is the initial query, otherwise it is the sid doctype - optional document type, only relevant for initial query Returns: json, sid ]] local function scroll(sidOrQuery, doctype) local json local hasBody = false if type(sidOrQuery) == 'table' then local query = sidOrQuery doctype = doctype or default_doc if doctype == "mbox" then -- Check if the query returns the body attribute if contains(query._source, 'body') then hasBody = true end end local url = config.es_url .. doctype .. "/_search?scroll=1m" -- start off the scroll json = performRequest(url, query) else local sid = sidOrQuery hasBody = queryHasBody[sid] queryHasBody[sid] = nil -- drop old entry (sid may change) -- We have to do some gsubbing here, as ES expects us to be at the root of the ES URL -- But in case we're being proxied, let's just cut off the last part of the URL local url = config.es_url:gsub("[^/]+/?$", "") .. "/_search/scroll?scroll=1m&scroll_id=" .. sid -- continue the scroll json = performRequest(url) end if hasBody then -- propagate the setting for the next call queryHasBody[json._scroll_id] = true local dhh = json.hits.hits for k = 1, #dhh do local v = dhh[k]._source if v.body == JSON.null then v.body = '' end end end return json, json._scroll_id end -- delete a scroll id after use local function clear_scroll(sid) local url = config.es_url:gsub("[^/]+/?$", "") .. "/_search/scroll?scroll_id=" .. sid return performDelete(url, true) end -- Update a document local function update(doctype, id, query, consistency) doctype = doctype or default_doc local url = config.es_url .. doctype .. "/" .. id .. "/_update" if consistency then url = url .. "?write_consistency=" .. consistency end local json = performRequest(url, {doc = query }) return json or {}, url end -- Put a new document somewhere local function index(id, ty, body, consistency) if not id then error("id parameter must be provided", 3) end local url = config.es_url .. ty .. "/" .. id if consistency then url = url .. "?write_consistency=" .. consistency end local json = performRequest(url, body) return json or {} end local function setDefault(typ) default_doc = typ end -- module defs return { -- maximum results that can be returned by a query -- above this number, must use scrolling or search_after (ES 5.x) MAX_RESULT_WINDOW = 10000, find = getHits, findFast = getHeaders, findFastReverse = getHeadersReverse, get = getDoc, raw = raw, index = index, default = setDefault, update = update, scroll = scroll, clear_scroll = clear_scroll }