packages/auth0/data_stream/logs/agent/stream/cel.yml.hbs (135 lines of code) (raw):

config_version: 2 interval: {{interval}} {{#if enable_request_tracer}} resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson" request.tracer.maxbackups: 5 {{/if}} {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} resource.url: {{url}} state: client_id: {{client_id}} client_secret: {{client_secret}} look_back: {{initial_interval}} want_more: false take: {{batch_size}} redact: fields: - client_secret program: | state.with( post(state.url.trim_right("/") + "/oauth/token", "application/json", { "client_id": state.client_id, "client_secret": state.client_secret, "audience": state.url.trim_right("/") + "/api/v2/", "grant_type": "client_credentials", }.encode_json()).as(auth_resp, auth_resp.StatusCode != 200 ? { "events": { "error": { "code": string(auth_resp.StatusCode), "id": string(auth_resp.Status), "message": "POST:"+( size(auth_resp.Body) != 0 ? string(auth_resp.Body) : string(auth_resp.Status) + ' (' + string(auth_resp.StatusCode) + ')' ), }, }, "want_more": false, } : { "Body": bytes(auth_resp.Body).decode_json(), } ).as(token, has(token.events) ? token : get_request( state.?next.orValue( has(state.?cursor.next) ? // Use the cursor next rel link if it exists. state.cursor.next.parse_url().as(next, next.with({ // The next rel link includes the take parameter which the // user may have changed, so replace it with the config's // value. "RawQuery": next.RawQuery.parse_query().with({ ?"take": has(state.take) ? optional.of([string(state.take)]) : optional.none(), }).format_query() }).format_url()) : // Otherwise construct a next rel-ish link to look back. state.url.trim_right("/") + "/api/v2/logs?" + { ?"take": has(state.take) ? optional.of([string(state.take)]) : optional.none(), ?"from": has(state.look_back) ? // Format a relative timestamp into a log ID. optional.of(["900" + (now-duration(state.look_back)).format("20060102150405") + "000000000000000000000000000000000000000"]) : optional.none(), }.format_query() ) ).with({ "Header": { "Authorization": [token.?Body.token_type.orValue("Bearer") + " " + token.?Body.access_token.orValue("MISSING")], "Accept": ["application/json"], } }).do_request().as(resp, resp.StatusCode != 200 ? { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET:"+( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, } : { "Body": bytes(resp.Body).decode_json(), ?"next": resp.Header.?Link[0].orValue("").as(next, next.split(";").as(attrs, attrs.exists(attr, attr.contains('rel="next"')) ? attrs.map(attr, attr.matches("^<https?://"), attr.trim_prefix('<').trim_suffix('>'))[?0] : optional.none() )), }.as(result, result.with({ "events": result.Body.map(e, {"json": {"log_id": e.log_id, "data": e}}), "cursor": { ?"next": result.?next, }, "want_more": has(result.next) && size(result.Body) != 0, })).drop("Body") ) ) ) tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}