packages/snyk/data_stream/audit_logs/agent/stream/cel.yml.hbs (213 lines of code) (raw):

config_version: 2 interval: {{interval}} {{#if enable_request_tracer}} resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson" request.tracer.maxbackups: 5 {{/if}} {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} resource.url: {{url}} state: initial_interval: {{initial_interval}} end_point_type: {{audit_type}} # Keep version in sync with the value in the README. version: "2024-04-29" audit_id: {{audit_id}} api_token: {{api_token}} {{#if first_interval}} lookback: {{first_interval}} {{/if}} {{#if user_id}} user_id: {{user_id}} {{/if}} {{#if project_id}} project_id: {{project_id}} {{/if}} {{#if event}} event_filter: {{#each event as |e|}} - {{e}} {{/each}} {{/if}} {{#if batch_size}} size: {{batch_size}} {{/if}} want_more: false redact: fields: - audit_id - api_token program: | state.with({ "Header": { "Accept": ["application/vnd.api+json"], "Authorization": ["Token " + state.api_token], } }.as(auth_header, ( has(state.work_list) ? // We have a work-list, do that first. // Work-lists do not allow partial requests on a work-list. // I think this needs to allow head()/tail() approach. This // will be possible when kibana.version is 8.15 or better. state.work_list : (state.audit_id == "ALL" && state.end_point_type == "/rest/orgs/") ? // Otherwise, we have a multi-org request... get_request( state.url.trim_right("/") + "/rest/orgs?" + { "version": [state.version], // The /rest/orgs endpoint returns between 10 and 100 results per // request (multiples of 10 only), defaulting to 10. For simplicity // we'll initially always limit to 100. In future we can paginate // over multiple requests if there is demand, possibly with a // configurable limit. "limit": ['100'], }.format_query() ).with(auth_header).do_request().as(resp, resp.StatusCode != 200 ? [] : // TODO: Remove unnecessary bytes conversion when kibana.version is at least v8.15.0. bytes(resp.Body).decode_json().data.map(org, { "id": org.id, // Migrating from single org to multi-org will lose cursor last_created. // This cannot be worked around since the multi-org mark clobbers the // the ID and we cannot know which of the new orgs corresponds to the // existing last_created value. ?"last_created": state.?cursor[org.id].last_created, }) ) : has(state.?cursor.last_created) ? // ... a single legacy cursor, ... [{ "id": state.audit_id, "last_created": state.cursor.last_created, }] : has(state.cursor) && state.end_point_type == "/rest/orgs/" ? // ... a multi-org cursor, ... state.cursor.map(audit_id, state.cursor[audit_id].with({"id": audit_id})) : // ... or a new collection. [{"id": state.audit_id}] ).map(item, get_request( state.url.trim_right("/") + item.?next.orValue( state.end_point_type + item.id + "/audit_logs/search?" + { "version": [state.version], "sort_order": ['ASC'], ?"from": has(item.last_created) ? // Step past the last event by the smallest // experimentally determined interval. Is this safe? // The alternative is to recollect the last event of // the previous collection. optional.of([string(timestamp(item.last_created)+duration("1us"))]) : has(state.lookback) ? optional.of([string(now-duration(state.lookback))]) : optional.none(), ?"size": has(state.size) ? optional.of([string(int(state.size))]) : optional.none(), ?"user_id": has(state.user_id) ? optional.of([state.user_id]) : optional.none(), ?"project_id": has(state.project_id) ? optional.of([state.project_id]) : optional.none(), ?"events": state.?events_filter, }.format_query() ) ).with(auth_header).do_request().as(resp, resp.StatusCode != 200 ? { "id": item.id, "events": [{ "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')', } }], "want_more": false, } : // TODO: Remove unnecessary bytes conversion when kibana.version is at least v8.15.0. bytes(resp.Body).decode_json().as(body, !has(body.?data.items) ? { "id": item.id, "events":[], "want_more": false, } : { "id": item.id, "events": body.data.items.map(item, { "message": item.encode_json() }), "cursor": { "id": item.id, ?"next": body.?links.next, // This could be // // size(body.data.items) == 0 ? // [item.?last_created.orValue(now)] // : // body.data.items[size(body.data.items)-1] // // if sort_order=ASC is reliable. "last_created": body.data.items.map(item, has(item.created), timestamp(item.created) ).as(times, size(times) == 0 ? item.?last_created.orValue(now) : times.max()), }, "want_more": has(body.?links.next), } ) ) ).as(result, { // The cursor cannot contain the next link since it may // stale by the time we revisit the cursor elements. // Make sure we remove legacy cursor time in last_created. // TODO: Replace the line below with the following when kibana.version is at least v8.15.0. // "cursor": state.?cursor.orValue({}).drop("last_created").with(zip( "cursor": [state.?cursor.orValue({})].drop("last_created")[0].with(zip( result.map(r, has(r.?cursor.id), r.cursor.id), // TODO: Replace the line below with the following when kibana.version is at least v8.15.0. // result.map(r, has(r.?cursor.id), r.cursor.drop(["id","next"])) result.map(r, has(r.?cursor.id), [r.cursor].drop(["id","next"])[0]) )), // The work_list does contain the next link since this // must be processed within the current eval loop. "work_list": result.map(r, has(r.?cursor.next), { "id": r.cursor.id, "next": r.cursor.next, }), "events": result.map(r, r.events).flatten(), "want_more": result.exists(r, r.want_more), }) )) tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#if preserve_duplicate_custom_fields}} - preserve_duplicate_custom_fields {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}