packages/snyk/data_stream/audit_logs/agent/stream/cel.yml.hbs (213 lines of code) (raw):
config_version: 2
interval: {{interval}}
{{#if enable_request_tracer}}
resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
request.tracer.maxbackups: 5
{{/if}}
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{url}}
state:
initial_interval: {{initial_interval}}
end_point_type: {{audit_type}}
# Keep version in sync with the value in the README.
version: "2024-04-29"
audit_id: {{audit_id}}
api_token: {{api_token}}
{{#if first_interval}}
lookback: {{first_interval}}
{{/if}}
{{#if user_id}}
user_id: {{user_id}}
{{/if}}
{{#if project_id}}
project_id: {{project_id}}
{{/if}}
{{#if event}}
event_filter:
{{#each event as |e|}}
- {{e}}
{{/each}}
{{/if}}
{{#if batch_size}}
size: {{batch_size}}
{{/if}}
want_more: false
redact:
fields:
- audit_id
- api_token
program: |
state.with({
"Header": {
"Accept": ["application/vnd.api+json"],
"Authorization": ["Token " + state.api_token],
}
}.as(auth_header,
(
has(state.work_list) ?
// We have a work-list, do that first.
// Work-lists do not allow partial requests on a work-list.
// I think this needs to allow head()/tail() approach. This
// will be possible when kibana.version is 8.15 or better.
state.work_list
: (state.audit_id == "ALL" && state.end_point_type == "/rest/orgs/") ?
// Otherwise, we have a multi-org request...
get_request(
state.url.trim_right("/") + "/rest/orgs?" + {
"version": [state.version],
// The /rest/orgs endpoint returns between 10 and 100 results per
// request (multiples of 10 only), defaulting to 10. For simplicity
// we'll initially always limit to 100. In future we can paginate
// over multiple requests if there is demand, possibly with a
// configurable limit.
"limit": ['100'],
}.format_query()
).with(auth_header).do_request().as(resp, resp.StatusCode != 200 ? [] :
// TODO: Remove unnecessary bytes conversion when kibana.version is at least v8.15.0.
bytes(resp.Body).decode_json().data.map(org, {
"id": org.id,
// Migrating from single org to multi-org will lose cursor last_created.
// This cannot be worked around since the multi-org mark clobbers the
// the ID and we cannot know which of the new orgs corresponds to the
// existing last_created value.
?"last_created": state.?cursor[org.id].last_created,
})
)
: has(state.?cursor.last_created) ?
// ... a single legacy cursor, ...
[{
"id": state.audit_id,
"last_created": state.cursor.last_created,
}]
: has(state.cursor) && state.end_point_type == "/rest/orgs/" ?
// ... a multi-org cursor, ...
state.cursor.map(audit_id, state.cursor[audit_id].with({"id": audit_id}))
:
// ... or a new collection.
[{"id": state.audit_id}]
).map(item,
get_request(
state.url.trim_right("/") + item.?next.orValue(
state.end_point_type + item.id + "/audit_logs/search?" + {
"version": [state.version],
"sort_order": ['ASC'],
?"from": has(item.last_created) ?
// Step past the last event by the smallest
// experimentally determined interval. Is this safe?
// The alternative is to recollect the last event of
// the previous collection.
optional.of([string(timestamp(item.last_created)+duration("1us"))])
: has(state.lookback) ?
optional.of([string(now-duration(state.lookback))])
:
optional.none(),
?"size": has(state.size) ?
optional.of([string(int(state.size))])
:
optional.none(),
?"user_id": has(state.user_id) ?
optional.of([state.user_id])
:
optional.none(),
?"project_id": has(state.project_id) ?
optional.of([state.project_id])
:
optional.none(),
?"events": state.?events_filter,
}.format_query()
)
).with(auth_header).do_request().as(resp, resp.StatusCode != 200 ?
{
"id": item.id,
"events": [{
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')',
}
}],
"want_more": false,
}
:
// TODO: Remove unnecessary bytes conversion when kibana.version is at least v8.15.0.
bytes(resp.Body).decode_json().as(body, !has(body.?data.items) ?
{
"id": item.id,
"events":[],
"want_more": false,
}
:
{
"id": item.id,
"events": body.data.items.map(item, {
"message": item.encode_json()
}),
"cursor": {
"id": item.id,
?"next": body.?links.next,
// This could be
//
// size(body.data.items) == 0 ?
// [item.?last_created.orValue(now)]
// :
// body.data.items[size(body.data.items)-1]
//
// if sort_order=ASC is reliable.
"last_created": body.data.items.map(item,
has(item.created), timestamp(item.created)
).as(times, size(times) == 0 ? item.?last_created.orValue(now) : times.max()),
},
"want_more": has(body.?links.next),
}
)
)
).as(result, {
// The cursor cannot contain the next link since it may
// stale by the time we revisit the cursor elements.
// Make sure we remove legacy cursor time in last_created.
// TODO: Replace the line below with the following when kibana.version is at least v8.15.0.
// "cursor": state.?cursor.orValue({}).drop("last_created").with(zip(
"cursor": [state.?cursor.orValue({})].drop("last_created")[0].with(zip(
result.map(r, has(r.?cursor.id), r.cursor.id),
// TODO: Replace the line below with the following when kibana.version is at least v8.15.0.
// result.map(r, has(r.?cursor.id), r.cursor.drop(["id","next"]))
result.map(r, has(r.?cursor.id), [r.cursor].drop(["id","next"])[0])
)),
// The work_list does contain the next link since this
// must be processed within the current eval loop.
"work_list": result.map(r, has(r.?cursor.next), {
"id": r.cursor.id,
"next": r.cursor.next,
}),
"events": result.map(r, r.events).flatten(),
"want_more": result.exists(r, r.want_more),
})
))
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}