packages/o365/data_stream/audit/agent/stream/cel.yml.hbs (292 lines of code) (raw):
interval: {{interval}}
auth.oauth2:
client.id: {{client_id}}
client.secret: {{client_secret}}
provider: azure
scopes:
{{#each token_scopes as |token_scope|}}
- {{token_scope}}
{{/each}}
endpoint_params:
grant_type: client_credentials
{{#if token_url}}
token_url: {{token_url}}/{{azure_tenant_id}}/oauth2/v2.0/token
{{else if azure_tenant_id}}
azure.tenant_id: {{azure_tenant_id}}
{{/if}}
resource.url: {{url}}
{{#if resource_ssl}}
resource.ssl:
{{resource_ssl}}
{{/if}}
{{#if resource_timeout}}
resource.timeout: {{resource_timeout}}
{{/if}}
{{#if resource_proxy_url}}
resource.proxy_url: {{resource_proxy_url}}
{{/if}}
{{#if resource_retry_max_attempts}}
resource.retry.max_attempts: {{resource_retry_max_attempts}}
{{/if}}
{{#if resource_retry_wait_min}}
resource.retry.wait_min: {{resource_retry_wait_min}}
{{/if}}
{{#if resource_retry_wait_max}}
resource.retry.wait_max: {{resource_retry_wait_max}}
{{/if}}
{{#if resource_redirect_forward_headers}}
resource.redirect.forward_headers: {{resource_redirect_forward_headers}}
{{/if}}
{{#if resource_redirect_headers_ban_list}}
resource.redirect.headers_ban_list:
{{#each resource_redirect_headers_ban_list as |item|}}
- {{item}}
{{/each}}
{{/if}}
{{#if resource_redirect_max_redirects}}
resource.redirect.max_redirects: {{resource_redirect_max_redirects}}
{{/if}}
{{#if resource_rate_limit_limit}}
resource.rate_limit.limit: {{resource_rate_limit_limit}}
{{/if}}
{{#if resource_rate_limit_burst}}
resource.rate_limit.burst: {{resource_rate_limit_burst}}
{{/if}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 10
maxsize: 5
state:
want_more: false
base:
tenant_id: "{{azure_tenant_id}}"
list_contents_start_time: "{{initial_interval}}"
batch_interval: "{{batch_interval}}"
content_types: "{{content_types}}"
redact:
fields:
- base.tenant_id
program: |-
state.base.content_types.split(",").map(content_type_raw,
content_type_raw.trim_space()
).map(content_type,
request(
"POST",
state.url.trim_right("/") + "/api/v1.0/" + state.base.tenant_id + "/activity/feed/subscriptions/start?" +
{
"contentType": [content_type],
"PublisherIdentifier": [state.base.tenant_id],
}.format_query()
).do_request().as(start_subs_resp,
start_subs_resp.Body.decode_json().as(start_subs_resp_body,
(
has(start_subs_resp_body.status) && start_subs_resp_body.status == "enabled" ||
has(start_subs_resp_body.error) && has(start_subs_resp_body.error.code) &&
start_subs_resp_body.error.code == "AF20024"
) ?
// When start-subscription API returns success or if already started subscription,
duration(state.base.batch_interval).as(batch_interval,
min(duration("24h"), batch_interval).as(batch_interval,
request(
"GET",
(
state.want_more && has(state.?cursor.content_types_state_as_list) &&
size(state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)) > 0 &&
state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)[0].next_page != ""
) ?
// if NextPageUri exists
state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)[0].next_page
:
(
has(state.?cursor.content_types_state_as_list) &&
size(state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)) > 0
) ?
// if NextPageUri does not exist, but content_type_state_created_at exists in state
state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type).as(content_type_state,
content_type_state[0].content_created_at.as(content_type_state_created_at,
// if saved time inside state is more than 7 days old, then change it to 7 days.
max(
// The 168h (7d) API age limit is expressed as 167h55m to prevent API call
// delay from causing a call to fail.
now - duration("167h55m"),
content_type_state_created_at.parse_time(time_layout.RFC3339)
).as(state_created_at_calc,
state.url.trim_right("/") + "/api/v1.0/" + state.base.tenant_id + "/activity/feed/subscriptions/content?" +
{
"contentType": [content_type],
"PublisherIdentifier": [state.base.tenant_id],
"startTime": [string(min(now - duration("1s"), state_created_at_calc + duration("1s")))],
"endTime": [string(min(now, state_created_at_calc + batch_interval))],
}.format_query()
)
)
)
:
// initial run when no cursor state exists i.e., polling from initial_interval
state.url.trim_right("/") + "/api/v1.0/" + state.base.tenant_id + "/activity/feed/subscriptions/content?" +
{
"contentType": [content_type],
"PublisherIdentifier": [state.base.tenant_id],
"startTime": [string(min(now - duration("1s"), now - duration(state.base.list_contents_start_time)))],
"endTime": [string(min(now, now - duration(state.base.list_contents_start_time) + batch_interval))],
}.format_query()
)
)
).do_request().as(list_contents_resp,
list_contents_resp.Body.decode_json().as(list_contents_resp_body,
(
type(list_contents_resp_body) != map && size(list_contents_resp_body) > 0 &&
has(list_contents_resp_body[0].contentUri) && list_contents_resp_body[0].contentUri != "" &&
has(list_contents_resp_body[0].contentCreated) && list_contents_resp_body[0].contentCreated != ""
) ?
// contents exist to consume
list_contents_resp_body.map(l1,
(has(l1.contentExpiration) && l1.contentExpiration.parse_time(time_layout.RFC3339) >= now) ?
request("GET", l1.contentUri).do_request().as(content_resp,
(has(content_resp.StatusCode) && content_resp.StatusCode == 200 && size(content_resp.Body) > 0) ?
content_resp.Body.decode_json().map(content_resp_body,
content_resp_body.with({"copy": {"o365audit": content_resp_body}})
).map(content_resp_body_with_copy,
content_resp_body_with_copy.copy
).flatten().drop_empty().as(contents,
{
"events_per_content_type": contents,
"content_type": content_type,
// if 'contentCreated' is older than 167h55m, change it to 167h55m.
"content_created_at": {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().as(temp_max,
(temp_max.parse_time(time_layout.RFC3339) > now - duration("167h55m")) ?
temp_max
:
(now - duration("167h55m")).format(time_layout.RFC3339)
),
"next_page": (has(list_contents_resp.?Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0) ?
(list_contents_resp.Header.NextPageUri[0])
: (has(list_contents_resp.?Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0) ?
(list_contents_resp.Header.Nextpageuri[0])
:
"",
// keep fetching more if (nextpageuri exists) or (max time returned date != today's date)
"want_more_content": has(list_contents_resp.Header) &&
(
has(list_contents_resp.Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0 ||
has(list_contents_resp.Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0
) || {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().split("T").as(t, t.size() > 1 &&
t[0] != now.format("2006-01-02")),
}
)
:
{
"events_per_content_type": [],
"content_type": content_type,
"content_created_at": {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().as(temp_max,
(temp_max.parse_time(time_layout.RFC3339) > now - duration("167h55m")) ?
temp_max
:
(now - duration("167h55m")).format(time_layout.RFC3339)
),
"next_page": (has(list_contents_resp.?Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0) ?
(list_contents_resp.Header.NextPageUri[0])
: (has(list_contents_resp.?Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0) ?
(list_contents_resp.Header.Nextpageuri[0])
:
"",
}
)
:
{
"events_per_content_type": [],
"content_type": content_type,
"content_created_at": {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().as(temp_max,
(temp_max.parse_time(time_layout.RFC3339) > now - duration("167h55m")) ?
temp_max
:
(now - duration("167h55m")).format(time_layout.RFC3339)
),
"next_page": (has(list_contents_resp.?Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0) ?
(list_contents_resp.Header.NextPageUri[0])
: (has(list_contents_resp.?Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0) ?
(list_contents_resp.Header.Nextpageuri[0])
:
"",
}
)
:
// contents does not exist, or is empty array
list_contents_resp.Request.URL.parse_url().RawQuery.parse_query().as(reqQuery,
[
{
"events_per_content_type": (size(list_contents_resp_body) == 0) ? [] : [list_contents_resp_body],
"content_type": content_type,
"content_created_at": (
has(list_contents_resp.StatusCode) && has(reqQuery.endTime) &&
list_contents_resp.StatusCode == 200 && reqQuery.endTime.size() > 0
) ?
(reqQuery.endTime[0])
: (has(reqQuery.startTime) && reqQuery.startTime.size() > 0) ?
(reqQuery.startTime[0])
: has(
state.cursor.content_types_state_as_list.filter(e,
e.content_type == content_type
)[0].content_created_at
) ?
state.cursor.content_types_state_as_list.filter(e,
e.content_type == content_type
)[0].content_created_at
:
string(now - duration(state.base.list_contents_start_time)),
"next_page": "",
"want_more_content": (
has(list_contents_resp.StatusCode) && has(reqQuery.endTime) &&
list_contents_resp.StatusCode == 200 && reqQuery.endTime.size() > 0 &&
reqQuery.endTime[0].split("T").as(t, t.size() > 0 &&
t[0] != now.format("2006-01-02"))
),
},
]
)
)
)
:
// When start-subscription API produces error, such as Authentication error.
[
{
"events_per_content_type": [],
"content_type": content_type,
"content_created_at": (has(state.cursor) && has(state.cursor.content_types_state_as_list)) ?
state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)[0].content_created_at
:
string(now - duration(state.base.list_contents_start_time)),
"next_page": "",
"want_more_content": false,
},
]
)
)
).flatten().drop_empty().as(events_list,
{
"url": state.url,
"base": state.base,
"events": events_list.collate("events_per_content_type"),
"want_more": events_list.collate("want_more_content").filter(e, e == true).size() > 0,
"cursor": {
"content_types_state_as_list": events_list.drop(["events_per_content_type"]),
},
}
)
{{#if tags}}
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{/if}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}