packages/o365/data_stream/audit/agent/stream/cel.yml.hbs (292 lines of code) (raw):

interval: {{interval}} auth.oauth2: client.id: {{client_id}} client.secret: {{client_secret}} provider: azure scopes: {{#each token_scopes as |token_scope|}} - {{token_scope}} {{/each}} endpoint_params: grant_type: client_credentials {{#if token_url}} token_url: {{token_url}}/{{azure_tenant_id}}/oauth2/v2.0/token {{else if azure_tenant_id}} azure.tenant_id: {{azure_tenant_id}} {{/if}} resource.url: {{url}} {{#if resource_ssl}} resource.ssl: {{resource_ssl}} {{/if}} {{#if resource_timeout}} resource.timeout: {{resource_timeout}} {{/if}} {{#if resource_proxy_url}} resource.proxy_url: {{resource_proxy_url}} {{/if}} {{#if resource_retry_max_attempts}} resource.retry.max_attempts: {{resource_retry_max_attempts}} {{/if}} {{#if resource_retry_wait_min}} resource.retry.wait_min: {{resource_retry_wait_min}} {{/if}} {{#if resource_retry_wait_max}} resource.retry.wait_max: {{resource_retry_wait_max}} {{/if}} {{#if resource_redirect_forward_headers}} resource.redirect.forward_headers: {{resource_redirect_forward_headers}} {{/if}} {{#if resource_redirect_headers_ban_list}} resource.redirect.headers_ban_list: {{#each resource_redirect_headers_ban_list as |item|}} - {{item}} {{/each}} {{/if}} {{#if resource_redirect_max_redirects}} resource.redirect.max_redirects: {{resource_redirect_max_redirects}} {{/if}} {{#if resource_rate_limit_limit}} resource.rate_limit.limit: {{resource_rate_limit_limit}} {{/if}} {{#if resource_rate_limit_burst}} resource.rate_limit.burst: {{resource_rate_limit_burst}} {{/if}} resource.tracer: enabled: {{enable_request_tracer}} filename: "../../logs/cel/http-request-trace-*.ndjson" maxbackups: 10 maxsize: 5 state: want_more: false base: tenant_id: "{{azure_tenant_id}}" list_contents_start_time: "{{initial_interval}}" batch_interval: "{{batch_interval}}" content_types: "{{content_types}}" redact: fields: - base.tenant_id program: |- state.base.content_types.split(",").map(content_type_raw, content_type_raw.trim_space() ).map(content_type, request( "POST", state.url.trim_right("/") + "/api/v1.0/" + state.base.tenant_id + "/activity/feed/subscriptions/start?" + { "contentType": [content_type], "PublisherIdentifier": [state.base.tenant_id], }.format_query() ).do_request().as(start_subs_resp, start_subs_resp.Body.decode_json().as(start_subs_resp_body, ( has(start_subs_resp_body.status) && start_subs_resp_body.status == "enabled" || has(start_subs_resp_body.error) && has(start_subs_resp_body.error.code) && start_subs_resp_body.error.code == "AF20024" ) ? // When start-subscription API returns success or if already started subscription, duration(state.base.batch_interval).as(batch_interval, min(duration("24h"), batch_interval).as(batch_interval, request( "GET", ( state.want_more && has(state.?cursor.content_types_state_as_list) && size(state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)) > 0 && state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)[0].next_page != "" ) ? // if NextPageUri exists state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)[0].next_page : ( has(state.?cursor.content_types_state_as_list) && size(state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)) > 0 ) ? // if NextPageUri does not exist, but content_type_state_created_at exists in state state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type).as(content_type_state, content_type_state[0].content_created_at.as(content_type_state_created_at, // if saved time inside state is more than 7 days old, then change it to 7 days. max( // The 168h (7d) API age limit is expressed as 167h55m to prevent API call // delay from causing a call to fail. now - duration("167h55m"), content_type_state_created_at.parse_time(time_layout.RFC3339) ).as(state_created_at_calc, state.url.trim_right("/") + "/api/v1.0/" + state.base.tenant_id + "/activity/feed/subscriptions/content?" + { "contentType": [content_type], "PublisherIdentifier": [state.base.tenant_id], "startTime": [string(min(now - duration("1s"), state_created_at_calc + duration("1s")))], "endTime": [string(min(now, state_created_at_calc + batch_interval))], }.format_query() ) ) ) : // initial run when no cursor state exists i.e., polling from initial_interval state.url.trim_right("/") + "/api/v1.0/" + state.base.tenant_id + "/activity/feed/subscriptions/content?" + { "contentType": [content_type], "PublisherIdentifier": [state.base.tenant_id], "startTime": [string(min(now - duration("1s"), now - duration(state.base.list_contents_start_time)))], "endTime": [string(min(now, now - duration(state.base.list_contents_start_time) + batch_interval))], }.format_query() ) ) ).do_request().as(list_contents_resp, list_contents_resp.Body.decode_json().as(list_contents_resp_body, ( type(list_contents_resp_body) != map && size(list_contents_resp_body) > 0 && has(list_contents_resp_body[0].contentUri) && list_contents_resp_body[0].contentUri != "" && has(list_contents_resp_body[0].contentCreated) && list_contents_resp_body[0].contentCreated != "" ) ? // contents exist to consume list_contents_resp_body.map(l1, (has(l1.contentExpiration) && l1.contentExpiration.parse_time(time_layout.RFC3339) >= now) ? request("GET", l1.contentUri).do_request().as(content_resp, (has(content_resp.StatusCode) && content_resp.StatusCode == 200 && size(content_resp.Body) > 0) ? content_resp.Body.decode_json().map(content_resp_body, content_resp_body.with({"copy": {"o365audit": content_resp_body}}) ).map(content_resp_body_with_copy, content_resp_body_with_copy.copy ).flatten().drop_empty().as(contents, { "events_per_content_type": contents, "content_type": content_type, // if 'contentCreated' is older than 167h55m, change it to 167h55m. "content_created_at": {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().as(temp_max, (temp_max.parse_time(time_layout.RFC3339) > now - duration("167h55m")) ? temp_max : (now - duration("167h55m")).format(time_layout.RFC3339) ), "next_page": (has(list_contents_resp.?Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0) ? (list_contents_resp.Header.NextPageUri[0]) : (has(list_contents_resp.?Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0) ? (list_contents_resp.Header.Nextpageuri[0]) : "", // keep fetching more if (nextpageuri exists) or (max time returned date != today's date) "want_more_content": has(list_contents_resp.Header) && ( has(list_contents_resp.Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0 || has(list_contents_resp.Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0 ) || {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().split("T").as(t, t.size() > 1 && t[0] != now.format("2006-01-02")), } ) : { "events_per_content_type": [], "content_type": content_type, "content_created_at": {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().as(temp_max, (temp_max.parse_time(time_layout.RFC3339) > now - duration("167h55m")) ? temp_max : (now - duration("167h55m")).format(time_layout.RFC3339) ), "next_page": (has(list_contents_resp.?Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0) ? (list_contents_resp.Header.NextPageUri[0]) : (has(list_contents_resp.?Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0) ? (list_contents_resp.Header.Nextpageuri[0]) : "", } ) : { "events_per_content_type": [], "content_type": content_type, "content_created_at": {"temp": list_contents_resp_body}.collate("temp.contentCreated").max().as(temp_max, (temp_max.parse_time(time_layout.RFC3339) > now - duration("167h55m")) ? temp_max : (now - duration("167h55m")).format(time_layout.RFC3339) ), "next_page": (has(list_contents_resp.?Header.NextPageUri) && list_contents_resp.Header.NextPageUri.size() > 0) ? (list_contents_resp.Header.NextPageUri[0]) : (has(list_contents_resp.?Header.Nextpageuri) && list_contents_resp.Header.Nextpageuri.size() > 0) ? (list_contents_resp.Header.Nextpageuri[0]) : "", } ) : // contents does not exist, or is empty array list_contents_resp.Request.URL.parse_url().RawQuery.parse_query().as(reqQuery, [ { "events_per_content_type": (size(list_contents_resp_body) == 0) ? [] : [list_contents_resp_body], "content_type": content_type, "content_created_at": ( has(list_contents_resp.StatusCode) && has(reqQuery.endTime) && list_contents_resp.StatusCode == 200 && reqQuery.endTime.size() > 0 ) ? (reqQuery.endTime[0]) : (has(reqQuery.startTime) && reqQuery.startTime.size() > 0) ? (reqQuery.startTime[0]) : has( state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type )[0].content_created_at ) ? state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type )[0].content_created_at : string(now - duration(state.base.list_contents_start_time)), "next_page": "", "want_more_content": ( has(list_contents_resp.StatusCode) && has(reqQuery.endTime) && list_contents_resp.StatusCode == 200 && reqQuery.endTime.size() > 0 && reqQuery.endTime[0].split("T").as(t, t.size() > 0 && t[0] != now.format("2006-01-02")) ), }, ] ) ) ) : // When start-subscription API produces error, such as Authentication error. [ { "events_per_content_type": [], "content_type": content_type, "content_created_at": (has(state.cursor) && has(state.cursor.content_types_state_as_list)) ? state.cursor.content_types_state_as_list.filter(e, e.content_type == content_type)[0].content_created_at : string(now - duration(state.base.list_contents_start_time)), "next_page": "", "want_more_content": false, }, ] ) ) ).flatten().drop_empty().as(events_list, { "url": state.url, "base": state.base, "events": events_list.collate("events_per_content_type"), "want_more": events_list.collate("want_more_content").filter(e, e == true).size() > 0, "cursor": { "content_types_state_as_list": events_list.drop(["events_per_content_type"]), }, } ) {{#if tags}} tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{/if}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}