packages/google_workspace/data_stream/chrome/agent/stream/cel.yml.hbs (117 lines of code) (raw):
config_version: 2
interval: {{interval}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 5
maxsize: 5
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
auth.oauth2:
provider: google
google:
jwt_file: {{jwt_file}}
jwt_json: {{jwt_json}}
delegated_account: {{delegated_account}}
scopes:
- https://www.googleapis.com/auth/admin.reports.audit.readonly
resource.url: {{api_host}}
state:
user_key: {{user_key}}
initial_interval: {{initial_interval}}
lag_time: {{lag_time}}
batch_size: {{batch_size}}
application_name_for_url: {{application_name_for_url}}
redact:
fields: ~
program: |
(
state.?want_more.orValue(false) ?
state
:
state.with({
"start_time": state.?cursor.last_timestamp.orValue(
(now - duration(state.initial_interval) - duration(state.lag_time)).format(time_layout.RFC3339)
),
"end_time": (now - duration(state.lag_time)).format(time_layout.RFC3339),
})
).as(state, state.with(
request(
"GET",
state.url.trim_right("/") + "/admin/reports/v1/activity/users/" + state.user_key + "/applications/" + state.application_name_for_url + "?"+ {
"maxResults": [string(state.batch_size)],
"endTime": [state.end_time],
"startTime": [state.start_time],
?"pageToken": state.?next.page_token.optMap(v, [v]),
}.format_query()
).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body,{
"events": (
has(body.items) ?
body.items.map(item, item.events.map(event, item.drop("events").as(root,
{"message":root.with({"events": event}).encode_json()}
))).flatten()
:
[]
),
// Update the cursor only if it is the last page to ensure any interrupted
// interval restarts from the last known cursor (`state.cursor.last_timestamp`),
// avoiding loss of data.
"cursor": {
?"last_timestamp": (
has(body.nextPageToken) ?
state.?cursor.last_timestamp
:
optional.of(state.end_time)
)
},
"want_more": has(body.nextPageToken),
// The provided page token must be generated within the same time interval as the request.
// Using a page token from a previous interval may lead to unexpected results.
// The current implementation prevents a page token from being carried over to a subsequent interval.
"next": {
?"page_token": body.?nextPageToken,
}
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET "+state.url.trim_right("/")+"/admin/reports/v1/activity/users/"+state.user_key+"/applications/" + state.application_name_for_url + ":"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
))
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}