packages/cyberark_epm/data_stream/admin_audit/agent/stream/cel.yml.hbs (202 lines of code) (raw):
config_version: 2
interval: {{interval}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 5
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{url}}
state:
username: {{username}}
password: {{password}}
initial_interval: {{initial_interval}}
session_timeout: {{session_timeout}}
limit: {{page_size}}
offset: 0
version: {{api_version}}
resource.rate_limit.limit: {{resource_rate_limit_limit}}
resource.rate_limit.burst: {{resource_rate_limit_burst}}
redact:
fields:
- password
- access_token
program: |
(
has(state.expiry) && timestamp(state.expiry) > now ?
{
"access_token": state.access_token,
"expiry": state.expiry,
"manager_url": state.manager_url
}
:
post_request(
state.url.trim_right("/") + "/EPM/API/" + state.version + "/Auth/EPM/Logon", "application/json", {
"Username": state.username,
"Password": state.password,
"ApplicationID": "Elastic Integration CyberArk EPM"
}.encode_json()
).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"access_token": body.EPMAuthenticationResult,
// Include 30s grace period to manage session expiry.
"expiry": (now() + duration(state.session_timeout) - duration("30s")).format(time_layout.RFC3339),
"manager_url": body.ManagerURL
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST /EPM/API/" + state.version + "/Auth/EPM/Logon:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
).as(token,
has(token.events) ? token : // Exit early due to failure.
token.with(
has(state.worklist) && state.worklist.size() > 0 ?
{
"worklist": state.worklist,
"next": state.next
}
:
request(
"GET",
token.manager_url.trim_right("/") + "/EPM/API/" + state.version + "/Sets?" + {
"Offset": [string(state.offset)],
"Limit": ["1000"]
}.format_query()
).with({
"Header":{
"Authorization": ["basic " + string(token.access_token)],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"worklist": body.Sets.map(e, e.Id),
"next": 0,
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET /EPM/API/" + state.version + "/Sets:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
).as(token, token.with({
"current_time": state.?want_more.orValue(false) ? state.current_time : now.format(time_layout.RFC3339)
})).as(token,
has(token.events) ? token : // Exit early due to failure.
state.with(
has(token.worklist) && token.worklist.size() > 0 ?
request(
"GET",
token.manager_url.trim_right("/") + "/EPM/API/" + state.version + "/Sets/" + token.worklist[token.next] + "/AdminAudit?" + {
"Offset": [string(state.?total_events.orValue(0))],
"Limit": [string(state.limit)],
"DateFrom": [state.?cursor.last_timestamp.orValue((timestamp(token.current_time) - duration(state.initial_interval)).format(time_layout.RFC3339))],
"DateTo": [token.current_time]
}.format_query()
).with({
"Header":{
"Authorization": ["basic " + string(token.access_token)]
}
}).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"events": (
has(body.AdminAudits) && body.AdminAudits.size() > 0 ?
body.AdminAudits.map(e, {
"message": e.encode_json(),
})
:
[{"message":"retry"}]
),
"access_token": token.access_token,
"expiry": token.expiry,
"manager_url": token.manager_url,
"worklist": int(state.?total_events.orValue(0)) + body.AdminAudits.size() == body.TotalCount && int(token.next) + 1 >= token.worklist.size() ? [] : token.worklist,
"next": (
int(state.?total_events.orValue(0)) + body.AdminAudits.size() != body.TotalCount ?
token.next
:
int(token.next) + 1 < token.worklist.size() ?
int(token.next) + 1
:
0
),
"current_time": token.current_time,
"offset": int(state.?total_events.orValue(0)) + body.AdminAudits.size() == body.TotalCount && int(token.next) + 1 >= token.worklist.size() ? int(state.offset) + token.worklist.size() : state.offset,
"total_events": int(state.?total_events.orValue(0)) + body.AdminAudits.size() != body.TotalCount ? int(state.?total_events.orValue(0)) + body.AdminAudits.size() : 0,
"want_more": true
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET /AdminAudit:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
:
{
"events": [{"message": "retry"}],
"cursor": {
"last_timestamp": token.current_time
},
"want_more": false,
"offset": 0
}
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}