packages/cloudflare/data_stream/audit/agent/stream/cel.yml.hbs (158 lines of code) (raw):
config_version: "2"
interval: {{interval}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 5
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{api_url}}
state:
account_id: {{account}}
{{#if per_page}}
per_page: {{per_page}}
{{/if}}
{{#if initial_interval}}
initial_interval: {{initial_interval}}
{{/if}}
{{#if auth_token}}
auth_token: {{auth_token}}
{{/if}}
{{#if auth_key}}
{{#if auth_email}}
auth_key: {{auth_key}}
auth_email: {{auth_email}}
{{/if}}
{{/if}}
redact:
fields:
- auth_token
- auth_key
program: |
(
has(state.want_more) && state.want_more ?
state
:
state.with({
"cursor" : {
"page": 1,
"last_timestamp": state.?cursor.last_timestamp.orValue((now - duration(state.?initial_interval.orValue("720h"))).format(time_layout.RFC3339Nano)),
},
"per_page" : state.?per_page.orValue(100),
})
).as(state, state.with(get_request(
state.url.trim_right("/") + "/client/v4/accounts/" + state.account_id + "/audit_logs?" + {
"since": [state.cursor.last_timestamp],
"page": [string(state.cursor.page)],
"per_page": [string(state.per_page)],
"direction": ["asc"],
}.format_query()
)).with({
"Header": {
"Content-Type": ["application/json"],
?"Authorization": has(state.auth_token) ?
optional.of(["Bearer " + state.auth_token])
:
optional.none(),
?"X-Auth-Email": has(state.auth_email) && !has(state.auth_token) ?
optional.of([state.auth_email])
:
optional.none(),
?"X-Auth-Key": has(state.auth_key) && !has(state.auth_token) ?
optional.of([state.auth_key])
:
optional.none(),
}
}).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body, {
"events": body.result.map(e, {
"message": e.encode_json(),
}),
"want_more": has(body.result) && size(body.result) == state.per_page,
"cursor": (
(has(body.result) && size(body.result) == state.per_page ?
{
"page": int(state.cursor.page) + 1,
"last_timestamp": state.cursor.last_timestamp,
}
:
{
"page": 1,
"last_timestamp": (
has(body.result) && size(body.result) > 0 ?
(
(timestamp(body.result.map(e, e.when).map(t, timestamp(t)).max().as(batch_last_timestamp,
(batch_last_timestamp > timestamp(state.cursor.last_timestamp) ?
batch_last_timestamp
:
timestamp(state.cursor.last_timestamp))
))).format(time_layout.RFC3339Nano)
)
:
state.cursor.last_timestamp
)
}
)
),
"account_id": state.account_id,
"per_page": state.per_page,
?"auth_token": state.?auth_token,
?"auth_email": state.?auth_email,
?"auth_key": state.?auth_key,
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.url.trim_right("/") + "/client/v4/accounts/" + state.account_id + "/audit_logs: "+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"cursor": {
"last_timestamp": state.cursor.last_timestamp,
"page": state.cursor.page,
},
"account_id": state.account_id,
"per_page": state.per_page,
?"auth_token": state.?auth_token,
?"auth_email": state.?auth_email,
?"auth_key": state.?auth_key,
}
)
)
{{#if tags.length}}
tags:
{{else if preserve_original_event}}
tags:
{{/if}}
{{#each tags as |tag i|}}
- {{tag}}
{{/each}}
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
processors:
- add_fields:
target: _config
fields:
account_id: {{account}}
{{#if processors}}
{{processors}}
{{/if}}