packages/cloudflare/data_stream/audit/agent/stream/cel.yml.hbs (158 lines of code) (raw):

config_version: "2" interval: {{interval}} resource.tracer: enabled: {{enable_request_tracer}} filename: "../../logs/cel/http-request-trace-*.ndjson" maxbackups: 5 {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} resource.url: {{api_url}} state: account_id: {{account}} {{#if per_page}} per_page: {{per_page}} {{/if}} {{#if initial_interval}} initial_interval: {{initial_interval}} {{/if}} {{#if auth_token}} auth_token: {{auth_token}} {{/if}} {{#if auth_key}} {{#if auth_email}} auth_key: {{auth_key}} auth_email: {{auth_email}} {{/if}} {{/if}} redact: fields: - auth_token - auth_key program: | ( has(state.want_more) && state.want_more ? state : state.with({ "cursor" : { "page": 1, "last_timestamp": state.?cursor.last_timestamp.orValue((now - duration(state.?initial_interval.orValue("720h"))).format(time_layout.RFC3339Nano)), }, "per_page" : state.?per_page.orValue(100), }) ).as(state, state.with(get_request( state.url.trim_right("/") + "/client/v4/accounts/" + state.account_id + "/audit_logs?" + { "since": [state.cursor.last_timestamp], "page": [string(state.cursor.page)], "per_page": [string(state.per_page)], "direction": ["asc"], }.format_query() )).with({ "Header": { "Content-Type": ["application/json"], ?"Authorization": has(state.auth_token) ? optional.of(["Bearer " + state.auth_token]) : optional.none(), ?"X-Auth-Email": has(state.auth_email) && !has(state.auth_token) ? optional.of([state.auth_email]) : optional.none(), ?"X-Auth-Key": has(state.auth_key) && !has(state.auth_token) ? optional.of([state.auth_key]) : optional.none(), } }).do_request().as(resp, resp.StatusCode == 200 ? resp.Body.decode_json().as(body, { "events": body.result.map(e, { "message": e.encode_json(), }), "want_more": has(body.result) && size(body.result) == state.per_page, "cursor": ( (has(body.result) && size(body.result) == state.per_page ? { "page": int(state.cursor.page) + 1, "last_timestamp": state.cursor.last_timestamp, } : { "page": 1, "last_timestamp": ( has(body.result) && size(body.result) > 0 ? ( (timestamp(body.result.map(e, e.when).map(t, timestamp(t)).max().as(batch_last_timestamp, (batch_last_timestamp > timestamp(state.cursor.last_timestamp) ? batch_last_timestamp : timestamp(state.cursor.last_timestamp)) ))).format(time_layout.RFC3339Nano) ) : state.cursor.last_timestamp ) } ) ), "account_id": state.account_id, "per_page": state.per_page, ?"auth_token": state.?auth_token, ?"auth_email": state.?auth_email, ?"auth_key": state.?auth_key, }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + state.url.trim_right("/") + "/client/v4/accounts/" + state.account_id + "/audit_logs: "+( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "cursor": { "last_timestamp": state.cursor.last_timestamp, "page": state.cursor.page, }, "account_id": state.account_id, "per_page": state.per_page, ?"auth_token": state.?auth_token, ?"auth_email": state.?auth_email, ?"auth_key": state.?auth_key, } ) ) {{#if tags.length}} tags: {{else if preserve_original_event}} tags: {{/if}} {{#each tags as |tag i|}} - {{tag}} {{/each}} {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} processors: - add_fields: target: _config fields: account_id: {{account}} {{#if processors}} {{processors}} {{/if}}