packages/cyberark_epm/data_stream/aggregated_event/agent/stream/cel.yml.hbs (207 lines of code) (raw):

config_version: 2 interval: {{interval}} resource.tracer: enabled: {{enable_request_tracer}} filename: "../../logs/cel/http-request-trace-*.ndjson" maxbackups: 5 {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} resource.url: {{url}} state: username: {{username}} password: {{password}} initial_interval: {{initial_interval}} session_timeout: {{session_timeout}} limit: {{page_size}} offset: 0 version: {{api_version}} resource.rate_limit.limit: {{resource_rate_limit_limit}} resource.rate_limit.burst: {{resource_rate_limit_burst}} redact: fields: - password - access_token program: | ( has(state.expiry) && timestamp(state.expiry) > now ? { "access_token": state.access_token, "expiry": state.expiry, "manager_url": state.manager_url } : post_request( state.url.trim_right("/") + "/EPM/API/" + state.version + "/Auth/EPM/Logon", "application/json", { "Username": state.username, "Password": state.password, "ApplicationID": "Elastic Integration CyberArk EPM" }.encode_json() ).do_request().as(resp, resp.StatusCode == 200 ? resp.Body.decode_json().as(body, { "access_token": body.EPMAuthenticationResult, // Include 30s grace period to manage session expiry. "expiry": (now() + duration(state.session_timeout) - duration("30s")).format(time_layout.RFC3339), "manager_url": body.ManagerURL }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "POST /EPM/API/" + state.version + "/Auth/EPM/Logon:"+( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, } ) ).as(token, has(token.events) ? token : // Exit early due to failure. token.with( has(state.worklist) && state.worklist.size() > 0 ? { "worklist": state.worklist, "next": state.next } : request( "GET", token.manager_url.trim_right("/") + "/EPM/API/" + state.version + "/Sets?" + { "Offset": [string(state.offset)], "Limit": ["1000"] }.format_query() ).with({ "Header":{ "Authorization": ["basic " + string(token.access_token)], } }).do_request().as(resp, resp.StatusCode == 200 ? resp.Body.decode_json().as(body, { "worklist": body.Sets.map(e, e.Id), "next": 0, }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET /EPM/API/" + state.version + "/Sets:"+( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, } ) ) ).as(token, token.with({ "current_time": state.?want_more.orValue(false) ? state.current_time : now.format(time_layout.RFC3339) })).as(token, has(token.events) ? token : // Exit early due to failure. state.with( has(token.worklist) && token.worklist.size() > 0 ? post_request( token.manager_url.trim_right("/") + "/EPM/API/" + state.version + "/Sets/" + token.worklist[token.next] + "/events/aggregations/search?" + { "nextCursor": [state.?next_cursor.orValue("start")], "limit": [string(state.limit)] }.format_query(), "application/json", { "start_time": state.?cursor.last_timestamp.orValue((timestamp(token.current_time) - duration(state.initial_interval)).format(time_layout.RFC3339)), "end_time": token.current_time }.as(filter, { "filter": "eventDate GE " + filter.start_time + " AND eventDate LE " + filter.end_time }.encode_json()) ).with({ "Header":{ "Authorization": ["basic " + string(token.access_token)], "Content-Type": ["application/json"], } }).do_request().as(resp, resp.StatusCode == 200 ? resp.Body.decode_json().as(body, { "events": ( has(body.events) && body.events.size() > 0 ? body.events.map(e, { "message": e.encode_json(), }) : [{"message":"retry"}] ), "access_token": token.access_token, "expiry": token.expiry, "manager_url": token.manager_url, "worklist": body.?nextCursor.orValue(null) == null && int(token.next) + 1 >= token.worklist.size() ? [] : token.worklist, "next": ( body.?nextCursor.orValue(null) != null ? token.next : int(token.next) + 1 < token.worklist.size() ? int(token.next) + 1 : 0 ), "current_time": token.current_time, "offset": body.?nextCursor.orValue(null) == null && int(token.next) + 1 >= token.worklist.size() ? int(state.offset) + token.worklist.size() : state.offset, "next_cursor": body.?nextCursor.orValue(null) != null ? body.nextCursor : "start", "want_more": true }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "POST /events/aggregations/search:"+( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, } ) : { "events": [{"message": "retry"}], "cursor": { "last_timestamp": token.current_time }, "want_more": false, "offset": 0 } ) ) tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#if preserve_duplicate_custom_fields}} - preserve_duplicate_custom_fields {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}