packages/google_workspace/data_stream/calendar/agent/stream/cel.yml.hbs (117 lines of code) (raw):

config_version: 2 interval: {{interval}} resource.tracer: enabled: {{enable_request_tracer}} filename: "../../logs/cel/http-request-trace-*.ndjson" maxbackups: 5 maxsize: 5 {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} auth.oauth2: provider: google google: jwt_file: {{jwt_file}} jwt_json: {{jwt_json}} delegated_account: {{delegated_account}} scopes: - https://www.googleapis.com/auth/admin.reports.audit.readonly resource.url: {{api_host}} state: user_key: {{user_key}} initial_interval: {{initial_interval}} lag_time: {{lag_time}} batch_size: {{batch_size}} application_name_for_url: {{application_name_for_url}} redact: fields: ~ program: | ( state.?want_more.orValue(false) ? state : state.with({ "start_time": state.?cursor.last_timestamp.orValue( (now - duration(state.initial_interval) - duration(state.lag_time)).format(time_layout.RFC3339) ), "end_time": (now - duration(state.lag_time)).format(time_layout.RFC3339), }) ).as(state, state.with( request( "GET", state.url.trim_right("/") + "/admin/reports/v1/activity/users/" + state.user_key + "/applications/" + state.application_name_for_url + "?"+ { "maxResults": [string(state.batch_size)], "endTime": [state.end_time], "startTime": [state.start_time], ?"pageToken": state.?next.page_token.optMap(v, [v]), }.format_query() ).do_request().as(resp, resp.StatusCode == 200 ? resp.Body.decode_json().as(body,{ "events": ( has(body.items) ? body.items.map(item, item.events.map(event, item.drop("events").as(root, {"message":root.with({"events": event}).encode_json()} ))).flatten() : [] ), // Update the cursor only if it is the last page to ensure any interrupted // interval restarts from the last known cursor (`state.cursor.last_timestamp`), // avoiding loss of data. "cursor": { ?"last_timestamp": ( has(body.nextPageToken) ? state.?cursor.last_timestamp : optional.of(state.end_time) ) }, "want_more": has(body.nextPageToken), // The provided page token must be generated within the same time interval as the request. // Using a page token from a previous interval may lead to unexpected results. // The current implementation prevents a page token from being carried over to a subsequent interval. "next": { ?"page_token": body.?nextPageToken, } }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET "+state.url.trim_right("/")+"/admin/reports/v1/activity/users/"+state.user_key+"/applications/" + state.application_name_for_url + ":"+( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, } ) )) tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#if preserve_duplicate_custom_fields}} - preserve_duplicate_custom_fields {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}