packages/ess_billing/data_stream/billing/agent/stream/cel.yml.hbs (135 lines of code) (raw):

config_version: 2 interval: "24h" {{#if enable_request_tracer}} resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson" resource.tracer.maxbackups: 5 {{/if}} {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} resource.url: "{{url}}/api/v2/billing/organizations/{{organization_id}}/costs/instances" state: api_key: {{api_key}} organization_id: "{{organization_id}}" lookbehind: {{lookbehind}} redact: fields: - api_key program: | state.with({ "from": (has(state.?cursor.last_to) ? timestamp(state.cursor.last_to) : (now() - duration(string(int(state.lookbehind)*24) + "h")) ).format(time_layout.DateOnly).parse_time(time_layout.DateOnly), "to": (has(state.?cursor.last_to) ? (timestamp(state.cursor.last_to) + duration("24h")) : (now() - duration(string(int(state.lookbehind)*24) + "h") + duration("24h")) ).format(time_layout.DateOnly).parse_time(time_layout.DateOnly), }.as(req, (req.to > now()) ? // We would fetch data in the future, back off { "events": [], "cursor": { // We don't change the last_to, we're just waiting "last_to": req.to }, "want_more": false, } : get_request(state.url + "?" + { "from": [req.from.format(time_layout.RFC3339)], "to": [req.to.format(time_layout.RFC3339)], }.format_query() ).with({ "Header": { "Authorization": ["ApiKey " + state.api_key] } }).do_request().as(resp, resp.StatusCode == 200 ? ( // Response is successful, but did we get any data? bytes(resp.Body).decode_json().as(body, has(body.instances) && size(body.instances) > 0 ? { "events": body.instances.map(instance, instance.product_line_items.map(line_item, { "ess": { "billing": line_item.with({ "deployment_name": instance.name, // Include deployment name "deployment_id": instance.id, // Include deployment ID "organization_id": state.organization_id, // Include organization ID "from": req.from.format(time_layout.RFC3339), "to": req.to.format(time_layout.RFC3339), }) } } )).flatten(), // Pass line_items as events, with added info "cursor": { "last_to": req.to }, // Are we more than 1 day behind? "want_more": req.to < now() - duration("24h"), } : // We don't have any data, but we still need to return an event // Otherwise the "want_more" logic will not work { "events": [{ "fake": true, // This will either way be dropped by the ingest pipeline }], "cursor": { "last_to": req.to }, // Are we more than 1 day behind? "want_more": req.to < now() - duration("24h"), } ) ) : // Response was not successful, return an error event { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + resp.Request.URL + ": " + ( (size(resp.Body) != 0) ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "cursor": { "last_to": req.to }, "want_more": req.to < now() - duration("24h"), } )) ) {{#if tags.length}} tags: {{else if preserve_original_event}} tags: {{/if}} {{#each tags as |tag i|}} - {{tag}} {{/each}} {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}