packages/cyberark_epm/data_stream/aggregated_event/agent/stream/cel.yml.hbs (207 lines of code) (raw):
config_version: 2
interval: {{interval}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 5
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{url}}
state:
username: {{username}}
password: {{password}}
initial_interval: {{initial_interval}}
session_timeout: {{session_timeout}}
limit: {{page_size}}
offset: 0
version: {{api_version}}
resource.rate_limit.limit: {{resource_rate_limit_limit}}
resource.rate_limit.burst: {{resource_rate_limit_burst}}
redact:
fields:
- password
- access_token
program: |
(
has(state.expiry) && timestamp(state.expiry) > now ?
{
"access_token": state.access_token,
"expiry": state.expiry,
"manager_url": state.manager_url
}
:
post_request(
state.url.trim_right("/") + "/EPM/API/" + state.version + "/Auth/EPM/Logon", "application/json", {
"Username": state.username,
"Password": state.password,
"ApplicationID": "Elastic Integration CyberArk EPM"
}.encode_json()
).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"access_token": body.EPMAuthenticationResult,
// Include 30s grace period to manage session expiry.
"expiry": (now() + duration(state.session_timeout) - duration("30s")).format(time_layout.RFC3339),
"manager_url": body.ManagerURL
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST /EPM/API/" + state.version + "/Auth/EPM/Logon:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
).as(token,
has(token.events) ? token : // Exit early due to failure.
token.with(
has(state.worklist) && state.worklist.size() > 0 ?
{
"worklist": state.worklist,
"next": state.next
}
:
request(
"GET",
token.manager_url.trim_right("/") + "/EPM/API/" + state.version + "/Sets?" + {
"Offset": [string(state.offset)],
"Limit": ["1000"]
}.format_query()
).with({
"Header":{
"Authorization": ["basic " + string(token.access_token)],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"worklist": body.Sets.map(e, e.Id),
"next": 0,
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET /EPM/API/" + state.version + "/Sets:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
).as(token, token.with({
"current_time": state.?want_more.orValue(false) ? state.current_time : now.format(time_layout.RFC3339)
})).as(token,
has(token.events) ? token : // Exit early due to failure.
state.with(
has(token.worklist) && token.worklist.size() > 0 ?
post_request(
token.manager_url.trim_right("/") + "/EPM/API/" + state.version + "/Sets/" + token.worklist[token.next] + "/events/aggregations/search?" + {
"nextCursor": [state.?next_cursor.orValue("start")],
"limit": [string(state.limit)]
}.format_query(),
"application/json",
{
"start_time": state.?cursor.last_timestamp.orValue((timestamp(token.current_time) - duration(state.initial_interval)).format(time_layout.RFC3339)),
"end_time": token.current_time
}.as(filter, {
"filter": "eventDate GE " + filter.start_time + " AND eventDate LE " + filter.end_time
}.encode_json())
).with({
"Header":{
"Authorization": ["basic " + string(token.access_token)],
"Content-Type": ["application/json"],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"events": (
has(body.events) && body.events.size() > 0 ?
body.events.map(e, {
"message": e.encode_json(),
})
:
[{"message":"retry"}]
),
"access_token": token.access_token,
"expiry": token.expiry,
"manager_url": token.manager_url,
"worklist": body.?nextCursor.orValue(null) == null && int(token.next) + 1 >= token.worklist.size() ? [] : token.worklist,
"next": (
body.?nextCursor.orValue(null) != null ?
token.next
:
int(token.next) + 1 < token.worklist.size() ?
int(token.next) + 1
:
0
),
"current_time": token.current_time,
"offset": body.?nextCursor.orValue(null) == null && int(token.next) + 1 >= token.worklist.size() ? int(state.offset) + token.worklist.size() : state.offset,
"next_cursor": body.?nextCursor.orValue(null) != null ? body.nextCursor : "start",
"want_more": true
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST /events/aggregations/search:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
:
{
"events": [{"message": "retry"}],
"cursor": {
"last_timestamp": token.current_time
},
"want_more": false,
"offset": 0
}
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}