x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/receiver.ts (1,458 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ import os from 'os'; import { cloneDeep } from 'lodash'; import type { Logger, LogMeta, CoreStart, IScopedClusterClient, ElasticsearchClient, SavedObjectsClientContract, } from '@kbn/core/server'; import type { AggregationsAggregate, IlmExplainLifecycleRequest, OpenPointInTimeResponse, SearchRequest, SearchResponse, SearchHit, SearchRequest as ESSearchRequest, SortResults, IndicesGetDataStreamRequest, IndicesStatsRequest, IlmGetLifecycleRequest, IndicesGetRequest, NodesStatsRequest, Duration, IndicesGetIndexTemplateRequest, } from '@elastic/elasticsearch/lib/api/types'; import { ENDPOINT_ARTIFACT_LISTS } from '@kbn/securitysolution-list-constants'; import { EQL_RULE_TYPE_ID, INDICATOR_RULE_TYPE_ID, ML_RULE_TYPE_ID, NEW_TERMS_RULE_TYPE_ID, QUERY_RULE_TYPE_ID, SAVED_QUERY_RULE_TYPE_ID, SIGNALS_ID, THRESHOLD_RULE_TYPE_ID, ESQL_RULE_TYPE_ID, } from '@kbn/securitysolution-rules'; import type { TransportResult } from '@elastic/elasticsearch'; import type { AgentPolicy, Installation } from '@kbn/fleet-plugin/common'; import type { AgentClient, AgentPolicyServiceInterface, PackageService, } from '@kbn/fleet-plugin/server'; import type { ExceptionListClient } from '@kbn/lists-plugin/server'; import moment from 'moment'; import { DEFAULT_DIAGNOSTIC_INDEX_PATTERN } from '../../../common/endpoint/constants'; import type { ExperimentalFeatures } from '../../../common'; import type { EndpointAppContextService } from '../../endpoint/endpoint_app_context_services'; import { exceptionListItemToTelemetryEntry, trustedApplicationToTelemetryEntry, ruleExceptionListItemToTelemetryEvent, setClusterInfo, newTelemetryLogger, } from './helpers'; import { Fetcher } from '../../endpoint/routes/resolver/tree/utils/fetch'; import type { TreeOptions, TreeResponse } from '../../endpoint/routes/resolver/tree/utils/fetch'; import type { SafeEndpointEvent, ResolverSchema } from '../../../common/endpoint/types'; import type { TelemetryEvent, EnhancedAlertEvent, EndpointMetricDocument, ESLicense, ESClusterInfo, GetEndpointListResponse, RuleSearchResult, ExceptionListItem, ValueListResponse, ValueListResponseAggregation, ValueListItemsResponseAggregation, ValueListExceptionListResponseAggregation, ValueListIndicatorMatchResponseAggregation, Nullable, EndpointMetricsAggregation, EndpointMetricsAbstract, EndpointPolicyResponseDocument, EndpointPolicyResponseAggregation, EndpointMetadataAggregation, EndpointMetadataDocument, } from './types'; import { telemetryConfiguration } from './configuration'; import { ENDPOINT_METRICS_INDEX } from '../../../common/constants'; import { PREBUILT_RULES_PACKAGE_NAME } from '../../../common/detection_engine/constants'; import type { TelemetryLogger } from './telemetry_logger'; import type { DataStream, IlmPhase, IlmPhases, IlmPolicy, IlmStats, Index, IndexSettings, IndexStats, IndexTemplateInfo, } from './indices.metadata.types'; import { chunkStringsByMaxLength } from './collections_helpers'; import type { NodeIngestPipelinesStats, Pipeline, Processor, Totals, } from './ingest_pipelines_stats.types'; export interface ITelemetryReceiver { start( core?: CoreStart, getIndexForType?: (type: string) => string, alertsIndex?: string, endpointContextService?: EndpointAppContextService, exceptionListClient?: ExceptionListClient, packageService?: PackageService ): Promise<void>; getClusterInfo(): Nullable<ESClusterInfo>; fetchClusterInfo(): Promise<ESClusterInfo>; getLicenseInfo(): Nullable<ESLicense>; fetchLicenseInfo(): Promise<Nullable<ESLicense>>; closePointInTime(pitId: string): Promise<void>; fetchDetectionRulesPackageVersion(): Promise<Nullable<Installation>>; /** * As the policy id + policy version does not exist on the Endpoint Metrics document * we need to fetch information about the Fleet Agent and sync the metrics document * with the Agent's policy data. * * @returns Map of agent id to policy id */ fetchFleetAgents(): Promise<Map<string, string>>; /** * Reads Endpoint Agent policy responses out of the `.ds-metrics-endpoint.policy*` data * stream and creates a local K/V structure that stores the policy response (V) with * the Endpoint Agent Id (K). A value will only exist if there has been a endpoint * enrolled in the last 24 hours OR a policy change has occurred. We only send * non-successful responses. If the field is null, we assume no responses in * the last 24h or no failures/warnings in the policy applied. * */ fetchEndpointPolicyResponses( executeFrom: string, executeTo: string ): Promise<Map<string, EndpointPolicyResponseDocument>>; /** * Reads Endpoint Agent metrics out of the `.ds-metrics-endpoint.metrics` data stream * and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will * report its metrics once per day OR every time a policy change has occured. If * a metric document(s) exists for an EP agent we map to fleet agent and policy. */ fetchEndpointMetricsAbstract( executeFrom: string, executeTo: string ): Promise<EndpointMetricsAbstract>; fetchEndpointMetricsById(ids: string[]): AsyncGenerator<EndpointMetricDocument[], void, unknown>; /** * Reads Endpoint Agent metadata out of the `.ds-metrics-endpoint.metadata` data stream * and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will * report its metadata once per day OR every time a policy change has occured. * If a metadata document(s) exists for an EP agent we map to fleet agent and policy. */ fetchEndpointMetadata( executeFrom: string, executeTo: string ): Promise<Map<string, EndpointMetadataDocument>>; fetchDiagnosticAlertsBatch( executeFrom: string, executeTo: string ): AsyncGenerator<TelemetryEvent[], void, unknown>; /** * Using a PIT executes the given query and returns the results in pages. * The page size is calculated using the mean of a sample of N documents * executing the same query. The query must have a sort attribute. * * @param index The index to search * @param query The query to use * @returns An async generator of pages of results * * @see {@link https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html} * @see {ITelemetryReceiver#setMaxPageSizeBytes} * @see {ITelemetryReceiver#setNumDocsToSample} */ paginate<T>(index: string, query: ESSearchRequest): AsyncGenerator<T[], void, unknown>; fetchPolicyConfigs(id: string): Promise<AgentPolicy | null | undefined>; fetchTrustedApplications(): Promise<{ data: ExceptionListItem[] | undefined; total: number; page: number; per_page: number; }>; fetchEndpointList(listId: string): Promise<GetEndpointListResponse>; fetchDetectionRules(): Promise< TransportResult< SearchResponse<RuleSearchResult, Record<string, AggregationsAggregate>>, unknown > >; fetchDetectionExceptionList( listId: string, ruleVersion: number ): Promise<{ data: ExceptionListItem[]; total: number; page: number; per_page: number; }>; fetchPrebuiltRuleAlertsBatch( index: string, executeFrom: string, executeTo: string ): AsyncGenerator<TelemetryEvent[], void, unknown>; fetchTimelineAlerts( index: string, rangeFrom: string, rangeTo: string ): Promise<Array<SearchHit<EnhancedAlertEvent>>>; buildProcessTree( entityId: string, resolverSchema: ResolverSchema, startOfDay: string, endOfDay: string, agentId: string ): TreeResponse; fetchTimelineEvents( nodeIds: string[] ): Promise<SearchResponse<SafeEndpointEvent, Record<string, AggregationsAggregate>>>; fetchValueListMetaData(interval: number): Promise<ValueListResponse>; getAlertsIndex(): string | undefined; getExperimentalFeatures(): ExperimentalFeatures | undefined; setMaxPageSizeBytes(bytes: number): void; setNumDocsToSample(n: number): void; getIndices(): Promise<IndexSettings[]>; getDataStreams(): Promise<DataStream[]>; getIndicesStats(indices: string[], chunkSize: number): AsyncGenerator<IndexStats, void, unknown>; getIlmsStats(indices: string[], chunkSize: number): AsyncGenerator<IlmStats, void, unknown>; getIndexTemplatesStats(): Promise<IndexTemplateInfo[]>; getIlmsPolicies(ilms: string[], chunkSize: number): AsyncGenerator<IlmPolicy, void, unknown>; getIngestPipelinesStats(timeout: Duration): Promise<NodeIngestPipelinesStats[]>; } export class TelemetryReceiver implements ITelemetryReceiver { private readonly logger: TelemetryLogger; private agentClient?: AgentClient; private agentPolicyService?: AgentPolicyServiceInterface; private _esClient?: ElasticsearchClient; private exceptionListClient?: ExceptionListClient; private soClient?: SavedObjectsClientContract; private getIndexForType?: (type: string) => string; private alertsIndex?: string; private clusterInfo?: ESClusterInfo; private licenseInfo?: Nullable<ESLicense>; private processTreeFetcher?: Fetcher; private packageService?: PackageService; private experimentalFeatures: ExperimentalFeatures | undefined; private readonly maxRecords = 10_000 as const; // default to 2% of host's total memory or 80MiB, whichever is smaller private maxPageSizeBytes: number = Math.min(os.totalmem() * 0.02, 80 * 1024 * 1024); // number of docs to query to estimate the size of a single doc private numDocsToSample: number = 10; constructor(logger: Logger) { this.logger = newTelemetryLogger(logger.get('telemetry_events.receiver')); } public async start( core?: CoreStart, getIndexForType?: (type: string) => string, alertsIndex?: string, endpointContextService?: EndpointAppContextService, exceptionListClient?: ExceptionListClient, packageService?: PackageService ) { this.getIndexForType = getIndexForType; this.alertsIndex = alertsIndex; this.agentClient = endpointContextService?.getInternalFleetServices().agent; this.agentPolicyService = endpointContextService?.getInternalFleetServices().agentPolicy; this._esClient = core?.elasticsearch.client.asInternalUser; this.exceptionListClient = exceptionListClient; this.packageService = packageService; this.soClient = core?.savedObjects.createInternalRepository() as unknown as SavedObjectsClientContract; this.clusterInfo = await this.fetchClusterInfo(); this.licenseInfo = await this.fetchLicenseInfo(); this.experimentalFeatures = endpointContextService?.experimentalFeatures; const elasticsearch = core?.elasticsearch.client as unknown as IScopedClusterClient; this.processTreeFetcher = new Fetcher(elasticsearch); setClusterInfo(this.clusterInfo); } public getClusterInfo(): ESClusterInfo | undefined { return this.clusterInfo; } public getLicenseInfo(): Nullable<ESLicense> { return this.licenseInfo; } public getAlertsIndex(): string | undefined { return this.alertsIndex; } public getExperimentalFeatures(): ExperimentalFeatures | undefined { return this.experimentalFeatures; } public async fetchDetectionRulesPackageVersion(): Promise<Installation | undefined> { return this.packageService?.asInternalUser.getInstallation(PREBUILT_RULES_PACKAGE_NAME); } public async fetchFleetAgents() { if (this.esClient === undefined || this.esClient === null) { throw Error('elasticsearch client is unavailable: cannot retrieve fleet agents'); } return ( this.agentClient ?.listAgents({ perPage: this.maxRecords, showInactive: true, kuery: 'status:*', // include unenrolled agents sortField: 'enrolled_at', sortOrder: 'desc', }) .then((response) => { const agents = response?.agents ?? []; return agents.reduce((cache, agent) => { if (agent.policy_id !== null && agent.policy_id !== undefined) { cache.set(agent.id, agent.policy_id); } return cache; }, new Map<string, string>()); }) ?? new Map() ); } public async fetchEndpointPolicyResponses(executeFrom: string, executeTo: string) { const query: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: `.ds-metrics-endpoint.policy*`, ignore_unavailable: false, size: 0, // no query results required - only aggregation quantity query: { range: { '@timestamp': { gte: executeFrom, lt: executeTo, }, }, }, aggs: { policy_responses: { terms: { size: this.maxRecords, field: 'agent.id', }, aggs: { latest_response: { top_hits: { size: 1, _source: { includes: [ 'agent', 'event', 'Endpoint.policy.applied.status', 'Endpoint.policy.applied.actions', 'Endpoint.policy.applied.artifacts.global', 'Endpoint.configuration', 'Endpoint.state', ], }, sort: [ { '@timestamp': { order: 'desc' as const, }, }, ], }, }, }, }, }, }; return this.esClient() .search(query, { meta: true }) .then((response) => response.body as unknown as EndpointPolicyResponseAggregation) .then((failedPolicyResponses) => { const buckets = failedPolicyResponses?.aggregations?.policy_responses?.buckets ?? []; // If there is no policy responses in the 24h > now then we will continue return buckets.reduce( (cache, endpointAgentId) => cache.set(endpointAgentId.key, endpointAgentId.latest_response.hits.hits[0]._source), new Map<string, EndpointPolicyResponseDocument>() ); }); } public async fetchEndpointMetricsAbstract(executeFrom: string, executeTo: string) { const query: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: ENDPOINT_METRICS_INDEX, ignore_unavailable: false, size: 0, // no query results required - only aggregation quantity query: { range: { '@timestamp': { gte: executeFrom, lt: executeTo, }, }, }, aggs: { endpoint_agents: { terms: { field: 'agent.id', size: this.maxRecords, }, aggs: { latest_metrics: { top_hits: { size: 1, _source: false, sort: [ { '@timestamp': { order: 'desc' as const, }, }, ], }, }, }, }, endpoint_count: { cardinality: { field: 'agent.id', }, }, }, }; return this.esClient() .search(query, { meta: true }) .then((response) => response.body as unknown as EndpointMetricsAggregation) .then((endpointMetricsResponse) => { const buckets = endpointMetricsResponse?.aggregations?.endpoint_agents?.buckets ?? []; const endpointMetricIds = buckets.map( (epMetrics) => epMetrics.latest_metrics.hits.hits[0]._id ); const totalEndpoints = buckets.length; return { endpointMetricIds, totalEndpoints }; }); } fetchEndpointMetricsById(ids: string[]): AsyncGenerator<EndpointMetricDocument[], void, unknown> { const query: ESSearchRequest = { sort: [{ '@timestamp': { order: 'desc' as const } }], query: { ids: { values: ids, }, }, _source: { includes: ['@timestamp', 'agent', 'Endpoint.metrics', 'elastic.agent', 'host', 'event'], }, }; return this.paginate<EndpointMetricDocument>(ENDPOINT_METRICS_INDEX, query); } public async fetchEndpointMetadata(executeFrom: string, executeTo: string) { const query: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: `.ds-metrics-endpoint.metadata-*`, ignore_unavailable: false, size: 0, // no query results required - only aggregation quantity query: { range: { '@timestamp': { gte: executeFrom, lt: executeTo, }, }, }, aggs: { endpoint_metadata: { terms: { field: 'agent.id', size: this.maxRecords, }, aggs: { latest_metadata: { top_hits: { size: 1, _source: { includes: ['@timestamp', 'agent', 'Endpoint.capabilities', 'elastic.agent'], }, sort: [ { '@timestamp': { order: 'desc' as const, }, }, ], }, }, }, }, }, }; return this.esClient() .search(query, { meta: true }) .then((response) => response.body as unknown as EndpointMetadataAggregation) .then((endpointMetadataResponse) => { const buckets = endpointMetadataResponse?.aggregations?.endpoint_metadata?.buckets ?? []; return buckets.reduce((cache, endpointAgentId) => { const doc = endpointAgentId.latest_metadata.hits.hits[0]._source; cache.set(endpointAgentId.key, doc); return cache; }, new Map<string, EndpointMetadataDocument>()); }); } public async *fetchDiagnosticAlertsBatch(executeFrom: string, executeTo: string) { this.logger.l('Searching diagnostic alerts', { from: executeFrom, to: executeTo, } as LogMeta); let pitId = await this.openPointInTime(DEFAULT_DIAGNOSTIC_INDEX_PATTERN); let fetchMore = true; let searchAfter: SortResults | undefined; const query: ESSearchRequest = { query: { range: { 'event.ingested': { gte: executeFrom, lt: executeTo, }, }, }, track_total_hits: false, sort: [ { 'event.ingested': { order: 'desc' as const, }, }, ], pit: { id: pitId }, search_after: searchAfter, size: telemetryConfiguration.telemetry_max_buffer_size, }; let response = null; while (fetchMore) { try { response = await this.esClient().search(query); const numOfHits = response?.hits.hits.length; if (numOfHits > 0) { const lastHit = response?.hits.hits[numOfHits - 1]; query.search_after = lastHit?.sort; } else { fetchMore = false; } this.logger.l('Diagnostic alerts to return', { numOfHits } as LogMeta); fetchMore = numOfHits > 0 && numOfHits < telemetryConfiguration.telemetry_max_buffer_size; } catch (e) { this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta); fetchMore = false; } if (response == null) { await this.closePointInTime(pitId); return; } const alerts = response?.hits.hits.flatMap((h) => h._source != null ? ([h._source] as TelemetryEvent[]) : [] ); if (response?.pit_id != null) { pitId = response?.pit_id; } yield alerts; } await this.closePointInTime(pitId); } public async fetchPolicyConfigs(id: string) { if (this.soClient === undefined || this.soClient === null) { throw Error( 'saved object client is unavailable: cannot retrieve endpoint policy configurations' ); } return this.agentPolicyService?.get(this.soClient, id); } public async fetchTrustedApplications() { if (this?.exceptionListClient === undefined || this?.exceptionListClient === null) { throw Error('exception list client is unavailable: cannot retrieve trusted applications'); } // Ensure list is created if it does not exist await this.exceptionListClient.createTrustedAppsList(); const timeFrom = moment.utc().subtract(1, 'day').valueOf(); const results = await this.exceptionListClient.findExceptionListItem({ listId: ENDPOINT_ARTIFACT_LISTS.trustedApps.id, page: 1, perPage: 10_000, filter: `exception-list-agnostic.attributes.created_at >= ${timeFrom}`, namespaceType: 'agnostic', sortField: 'name', sortOrder: 'asc', }); return { data: results?.data.map(trustedApplicationToTelemetryEntry), total: results?.total ?? 0, page: results?.page ?? 1, per_page: results?.per_page ?? this.maxRecords, }; } public async fetchEndpointList(listId: string): Promise<GetEndpointListResponse> { if (this?.exceptionListClient === undefined || this?.exceptionListClient === null) { throw Error('exception list client is unavailable: could not retrieve trusted applications'); } // Ensure list is created if it does not exist await this.exceptionListClient.createEndpointList(); const timeFrom = moment.utc().subtract(1, 'day').valueOf(); const results = await this.exceptionListClient.findExceptionListItem({ listId, page: 1, perPage: this.maxRecords, filter: `exception-list-agnostic.attributes.created_at >= ${timeFrom}`, namespaceType: 'agnostic', sortField: 'name', sortOrder: 'asc', }); return { data: results?.data.map(exceptionListItemToTelemetryEntry) ?? [], total: results?.total ?? 0, page: results?.page ?? 1, per_page: results?.per_page ?? this.maxRecords, }; } /** * Gets the elastic rules which are the rules that have immutable set to true and are of a particular rule type * @returns The elastic rules */ public async fetchDetectionRules() { const query: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: this.getIndexForType?.('alert'), ignore_unavailable: true, size: this.maxRecords, query: { bool: { must: [ { bool: { filter: { terms: { 'alert.alertTypeId': [ SIGNALS_ID, EQL_RULE_TYPE_ID, ESQL_RULE_TYPE_ID, ML_RULE_TYPE_ID, QUERY_RULE_TYPE_ID, SAVED_QUERY_RULE_TYPE_ID, INDICATOR_RULE_TYPE_ID, THRESHOLD_RULE_TYPE_ID, NEW_TERMS_RULE_TYPE_ID, ], }, }, }, }, { bool: { filter: { terms: { 'alert.params.immutable': [true], }, }, }, }, ], }, }, }; return this.esClient().search<RuleSearchResult>(query, { meta: true }); } public async fetchDetectionExceptionList(listId: string, ruleVersion: number) { if (this?.exceptionListClient === undefined || this?.exceptionListClient === null) { throw Error('exception list client is unavailable: could not retrieve trusted applications'); } // Ensure list is created if it does not exist await this.exceptionListClient.createTrustedAppsList(); const timeFrom = `exception-list.attributes.created_at >= ${moment .utc() .subtract(24, 'hours') .valueOf()}`; const results = await this.exceptionListClient?.findExceptionListsItem({ listId: [listId], filter: [timeFrom], perPage: this.maxRecords, page: 1, sortField: 'exception-list.created_at', sortOrder: 'desc', namespaceType: ['single'], }); return { data: results?.data.map((r) => ruleExceptionListItemToTelemetryEvent(r, ruleVersion)) ?? [], total: results?.total ?? 0, page: results?.page ?? 1, per_page: results?.per_page ?? this.maxRecords, }; } public async *fetchPrebuiltRuleAlertsBatch( index: string, executeFrom: string, executeTo: string ) { this.logger.l('Searching prebuilt rule alerts from', { executeFrom, executeTo, } as LogMeta); let pitId = await this.openPointInTime(index); let fetchMore = true; let searchAfter: SortResults | undefined; const query: ESSearchRequest = { query: { bool: { filter: [ { bool: { should: [ { bool: { must_not: { bool: { should: [ { match_phrase: { 'kibana.alert.rule.name': 'Malware Prevention Alert', }, }, ], }, }, }, }, { bool: { must_not: { bool: { should: [ { match_phrase: { 'kibana.alert.rule.name': 'Malware Detection Alert', }, }, ], }, }, }, }, { bool: { must_not: { bool: { should: [ { match_phrase: { 'kibana.alert.rule.name': 'Ransomware Prevention Alert', }, }, ], }, }, }, }, { bool: { must_not: { bool: { should: [ { match_phrase: { 'kibana.alert.rule.name': 'Ransomware Detection Alert', }, }, ], }, }, }, }, ], }, }, { bool: { should: [ { match_phrase: { 'kibana.alert.rule.parameters.immutable': 'true', }, }, ], }, }, { range: { '@timestamp': { gte: executeFrom, lte: executeTo, }, }, }, ], }, }, track_total_hits: false, sort: [ { '@timestamp': { order: 'asc', format: 'strict_date_optional_time_nanos' } }, { _shard_doc: 'desc' }, ], pit: { id: pitId }, search_after: searchAfter, size: 1_000, }; let response = null; try { while (fetchMore) { response = await this.esClient().search(query); const numOfHits = response?.hits.hits.length; if (numOfHits > 0) { const lastHit = response?.hits.hits[numOfHits - 1]; query.search_after = lastHit?.sort; } else { fetchMore = false; } fetchMore = numOfHits > 0 && numOfHits < 1_000; if (response == null) { await this.closePointInTime(pitId); return; } const alerts: TelemetryEvent[] = response.hits.hits.flatMap((h) => h._source != null ? ([h._source] as TelemetryEvent[]) : [] ); if (response?.pit_id != null) { pitId = response?.pit_id; } this.logger.l('Prebuilt rule alerts to return', { alerts: alerts.length } as LogMeta); yield alerts; } } catch (e) { // to keep backward compatibility with the previous implementation, silent return // once we start using `paginate` this error should be managed downstream this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta); return; } finally { await this.closePointInTime(pitId); } } private async openPointInTime(indexPattern: string) { const keepAlive = '5m'; const pitId: OpenPointInTimeResponse['id'] = ( await this.esClient().openPointInTime({ index: `${indexPattern}*`, keep_alive: keepAlive, expand_wildcards: ['open' as const, 'hidden' as const], }) ).id; return pitId; } public async closePointInTime(pitId: string) { try { await this.esClient().closePointInTime({ id: pitId }); } catch (error) { this.logger.warn('Error trying to close point in time', { pit: pitId, error_message: error.message, } as LogMeta); } } async fetchTimelineAlerts(index: string, rangeFrom: string, rangeTo: string) { // default is from looking at Kibana saved objects and online documentation const keepAlive = '5m'; // create and assign an initial point in time let pitId: OpenPointInTimeResponse['id'] = ( await this.esClient().openPointInTime({ index: `${index}*`, keep_alive: keepAlive, }) ).id; let fetchMore = true; let searchAfter: SortResults | undefined; let alertsToReturn: Array<SearchHit<EnhancedAlertEvent>> = []; while (fetchMore) { const query: ESSearchRequest = { query: { bool: { filter: [ { bool: { should: [ { match_phrase: { 'event.module': 'endpoint', }, }, ], }, }, { bool: { should: [ { match_phrase: { 'kibana.alert.rule.parameters.immutable': 'true', }, }, ], }, }, { range: { '@timestamp': { gte: rangeFrom, lte: rangeTo, }, }, }, ], }, }, aggs: { endpoint_alert_count: { cardinality: { field: 'event.id', }, }, }, track_total_hits: false, sort: [ { '@timestamp': { order: 'asc', format: 'strict_date_optional_time_nanos' } }, { _shard_doc: 'desc' }, ] as unknown as string[], pit: { id: pitId }, search_after: searchAfter, size: 1000, }; let response = null; try { response = await this.esClient().search<EnhancedAlertEvent>(query); const numOfHits = response?.hits.hits.length; if (numOfHits > 0) { const lastHit = response?.hits.hits[numOfHits - 1]; searchAfter = lastHit?.sort; } fetchMore = numOfHits > 0; } catch (e) { this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta); fetchMore = false; } const alerts = response?.hits.hits; alertsToReturn = alertsToReturn.concat(alerts ?? []); if (response?.pit_id != null) { pitId = response?.pit_id; } } try { await this.esClient().closePointInTime({ id: pitId }); } catch (error) { this.logger.warn('Error trying to close point in time', { pit: pitId, error_message: error.message, keepAlive, } as LogMeta); } this.logger.l('Timeline alerts to return', { alerts: alertsToReturn.length }); return alertsToReturn || []; } public async buildProcessTree( entityId: string, resolverSchema: ResolverSchema, startOfDay: string, endOfDay: string, agentId: string ): TreeResponse { if (this.processTreeFetcher === undefined || this.processTreeFetcher === null) { throw Error( 'resolver tree builder is unavailable: cannot build encoded endpoint event graph' ); } const request: TreeOptions = { ancestors: 200, descendants: 500, timeRange: { from: startOfDay, to: endOfDay, }, schema: resolverSchema, nodes: [entityId], indexPatterns: [`${this.alertsIndex}*`, 'logs-*'], descendantLevels: 20, agentId, }; return this.processTreeFetcher.tree(request, true); } public async fetchTimelineEvents(nodeIds: string[]) { const query: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: [`${this.alertsIndex}*`, 'logs-*'], ignore_unavailable: true, size: 100, _source: { include: [ '@timestamp', 'process', 'event', 'file', 'network', 'dns', 'kibana.rule.alert.uuid', ], }, query: { bool: { filter: [ { terms: { 'process.entity_id': nodeIds, }, }, { term: { 'event.category': 'process', }, }, ], }, }, }; return this.esClient().search<SafeEndpointEvent>(query); } public async fetchValueListMetaData(_interval: number) { const listQuery: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: '.lists-*', ignore_unavailable: true, size: 0, // no query results required - only aggregation quantity aggs: { total_value_list_count: { cardinality: { field: 'name', }, }, type_breakdown: { terms: { field: 'type', size: 50, }, }, }, }; const itemQuery: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: '.items-*', ignore_unavailable: true, size: 0, // no query results required - only aggregation quantity aggs: { value_list_item_count: { terms: { field: 'list_id', size: 100, }, }, }, }; const exceptionListQuery: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: this.getIndexForType?.('exception-list'), ignore_unavailable: true, size: 0, // no query results required - only aggregation quantity query: { bool: { must: [{ match: { 'exception-list.entries.type': 'list' } }], }, }, aggs: { vl_included_in_exception_lists_count: { cardinality: { field: 'exception-list.entries.list.id', }, }, }, }; const indicatorMatchRuleQuery: SearchRequest = { expand_wildcards: ['open' as const, 'hidden' as const], index: this.getIndexForType?.('alert'), ignore_unavailable: true, size: 0, query: { bool: { must: [{ prefix: { 'alert.params.threatIndex': '.items' } }], }, }, aggs: { vl_used_in_indicator_match_rule_count: { cardinality: { field: 'alert.params.ruleId', }, }, }, }; const [listMetrics, itemMetrics, exceptionListMetrics, indicatorMatchMetrics] = await Promise.all([ this.esClient().search(listQuery), this.esClient().search(itemQuery), this.esClient().search(exceptionListQuery), this.esClient().search(indicatorMatchRuleQuery), ]); const listMetricsResponse = listMetrics as unknown as ValueListResponseAggregation; const itemMetricsResponse = itemMetrics as unknown as ValueListItemsResponseAggregation; const exceptionListMetricsResponse = exceptionListMetrics as unknown as ValueListExceptionListResponseAggregation; const indicatorMatchMetricsResponse = indicatorMatchMetrics as unknown as ValueListIndicatorMatchResponseAggregation; return { listMetricsResponse, itemMetricsResponse, exceptionListMetricsResponse, indicatorMatchMetricsResponse, }; } public async fetchClusterInfo(): Promise<ESClusterInfo> { // @ts-expect-error version.build_date is of type estypes.DateTime return this.esClient().info(); } public async fetchLicenseInfo(): Promise<Nullable<ESLicense>> { try { const ret = (await this.esClient().transport.request({ method: 'GET', path: '/_license', querystring: { local: true, }, })) as { license: ESLicense }; return ret.license; } catch (err) { this.logger.warn('failed retrieving license', { error_message: err.message } as LogMeta); return undefined; } } // calculates the number of documents that can be returned per page // or "-1" if the query returns no documents private async docsPerPage(index: string, query: ESSearchRequest): Promise<number> { const sampleQuery: ESSearchRequest = { query: cloneDeep(query.query), size: this.numDocsToSample, index, }; const sampleSizeBytes = await this.esClient() .search<undefined>(sampleQuery) .then((r) => r.hits.hits.reduce((sum, hit) => JSON.stringify(hit._source).length + sum, 0)); const docSizeBytes = sampleSizeBytes / this.numDocsToSample; if (docSizeBytes === 0) { return -1; } return Math.max(Math.floor(this.maxPageSizeBytes / docSizeBytes), 1); } public async *paginate<T>(index: string, query: ESSearchRequest) { if (query.sort == null) { throw Error('Not possible to paginate a query without a sort attribute'); } const size = await this.docsPerPage(index, query); if (size === -1) { return; } const pit = { id: await this.openPointInTime(index), }; const esQuery: ESSearchRequest = { ...cloneDeep(query), pit, size: Math.min(size, 10_000), }; try { do { const response = await this.nextPage(esQuery); const hits = response?.hits.hits.length ?? 0; if (hits === 0) { return; } esQuery.search_after = response?.hits.hits[hits - 1]?.sort; const data = response?.hits.hits.flatMap((h) => h._source != null ? ([h._source] as T[]) : [] ); yield data; } while (esQuery.search_after !== undefined); } catch (e) { this.logger.warn('Error running paginated query', { error_message: e.message } as LogMeta); throw e; } finally { await this.closePointInTime(pit.id); } } private async nextPage( esQuery: ESSearchRequest ): Promise<SearchResponse<unknown, Record<string, AggregationsAggregate>>> { return this.esClient().search(esQuery); } public setMaxPageSizeBytes(bytes: number) { this.maxPageSizeBytes = bytes; } public setNumDocsToSample(n: number) { this.numDocsToSample = n; } private esClient(): ElasticsearchClient { if (this._esClient === undefined || this._esClient === null) { throw Error('elasticsearch client is unavailable'); } return this._esClient; } public async getIndices(): Promise<IndexSettings[]> { const es = this.esClient(); this.logger.l('Fetching indices'); const request: IndicesGetRequest = { index: '*', expand_wildcards: ['open', 'hidden'], filter_path: [ '*.mappings._source.mode', '*.settings.index.default_pipeline', '*.settings.index.final_pipeline', '*.settings.index.mode', '*.settings.index.provided_name', ], }; return es.indices .get(request) .then((indices) => Object.entries(indices).map(([index, value]) => { return { index_name: index, default_pipeline: value.settings?.index?.default_pipeline, final_pipeline: value.settings?.index?.final_pipeline, index_mode: value.settings?.index?.mode, source_mode: value.mappings?._source?.mode, } as IndexSettings; }) ) .catch((error) => { this.logger.warn('Error fetching indices', { error_message: error } as LogMeta); throw error; }); } public async getDataStreams(): Promise<DataStream[]> { const es = this.esClient(); this.logger.l('Fetching datstreams'); const request: IndicesGetDataStreamRequest = { name: '*', expand_wildcards: ['open', 'hidden'], filter_path: [ 'data_streams.ilm_policy', 'data_streams.indices.ilm_policy', 'data_streams.indices.index_name', 'data_streams.name', 'data_streams.template', ], }; return es.indices .getDataStream(request) .then((response) => response.data_streams.map((ds) => { return { datastream_name: ds.name, ilm_policy: ds.ilm_policy, template: ds.template, indices: ds.indices?.map((index) => { return { index_name: index.index_name, ilm_policy: index.ilm_policy, } as Index; }) ?? [], } as DataStream; }) ) .catch((error) => { this.logger.warn('Error fetching datastreams', { error_message: error } as LogMeta); throw error; }); } public async *getIndicesStats(indices: string[], chunkSize: number) { const es = this.esClient(); const safeChunkSize = Math.min(chunkSize, 3000); this.logger.l('Fetching indices stats'); const groupedIndices = chunkStringsByMaxLength(indices, safeChunkSize); this.logger.l('Splitted indices into groups', { groups: groupedIndices.length, indices: indices.length, } as LogMeta); for (const group of groupedIndices) { const request: IndicesStatsRequest = { index: group, level: 'indices', metric: ['docs', 'search', 'store', 'indexing'], expand_wildcards: ['open', 'hidden'], filter_path: [ 'indices.*.total.search.query_total', 'indices.*.total.search.query_time_in_millis', 'indices.*.total.docs.count', 'indices.*.total.docs.deleted', 'indices.*.total.store.size_in_bytes', 'indices.*.primaries.docs.count', 'indices.*.primaries.docs.deleted', 'indices.*.primaries.store.size_in_bytes', 'indices.*.total.indexing.index_failed', 'indices.*.total.indexing.index_failed_due_to_version_conflict', ], }; try { const response = await es.indices.stats(request); for (const [indexName, stats] of Object.entries(response.indices ?? {})) { yield { index_name: indexName, query_total: stats.total?.search?.query_total, query_time_in_millis: stats.total?.search?.query_time_in_millis, docs_count: stats.total?.docs?.count, docs_deleted: stats.total?.docs?.deleted, docs_total_size_in_bytes: stats.total?.store?.size_in_bytes, index_failed: stats.total?.indexing?.index_failed, // eslint-disable-next-line @typescript-eslint/no-explicit-any index_failed_due_to_version_conflict: (stats.total?.indexing as any) ?.index_failed_due_to_version_conflict, docs_count_primaries: stats.primaries?.docs?.count, docs_deleted_primaries: stats.primaries?.docs?.deleted, docs_total_size_in_bytes_primaries: stats.primaries?.store?.size_in_bytes, } as IndexStats; } } catch (error) { this.logger.warn('Error fetching indices stats', { error_message: error } as LogMeta); throw error; } } } public async *getIlmsStats(indices: string[], chunkSize: number) { const es = this.esClient(); const safeChunkSize = Math.min(chunkSize, 3000); const groupedIndices = chunkStringsByMaxLength(indices, safeChunkSize); this.logger.l('Splitted ilms into groups', { groups: groupedIndices.length, indices: indices.length, } as LogMeta); for (const group of groupedIndices) { const request: IlmExplainLifecycleRequest = { index: group.join(','), only_managed: false, filter_path: ['indices.*.phase', 'indices.*.age', 'indices.*.policy'], }; const data = await es.ilm.explainLifecycle(request); try { for (const [indexName, stats] of Object.entries(data.indices ?? {})) { const entry = { index_name: indexName, phase: ('phase' in stats && stats.phase) || undefined, age: ('age' in stats && stats.age) || undefined, policy_name: ('policy' in stats && stats.policy) || undefined, } as IlmStats; yield entry; } } catch (error) { this.logger.warn('Error fetching ilm stats', { error_message: error } as LogMeta); throw error; } } } public async getIndexTemplatesStats(): Promise<IndexTemplateInfo[]> { const es = this.esClient(); this.logger.l('Fetching datstreams'); const request: IndicesGetIndexTemplateRequest = { name: '*', filter_path: [ 'index_templates.name', 'index_templates.index_template.template.settings.index.mode', 'index_templates.index_template.data_stream', 'index_templates.index_template._meta.package.name', 'index_templates.index_template._meta.managed_by', 'index_templates.index_template._meta.beat', 'index_templates.index_template._meta.managed', 'index_templates.index_template.composed_of', 'index_templates.index_template.template.mappings._source.enabled', 'index_templates.index_template.template.mappings._source.includes', 'index_templates.index_template.template.mappings._source.excludes', ], }; return es.indices .getIndexTemplate(request) .then((response) => response.index_templates.map((props) => { const datastream = props.index_template?.data_stream !== undefined; return { template_name: props.name, index_mode: props.index_template.template?.settings?.index?.mode, package_name: props.index_template._meta?.package?.name, datastream, managed_by: props.index_template._meta?.managed_by, beat: props.index_template._meta?.beat, is_managed: props.index_template._meta?.managed, composed_of: props.index_template.composed_of, source_enabled: props.index_template.template?.mappings?._source?.enabled, source_includes: props.index_template.template?.mappings?._source?.includes ?? [], source_excludes: props.index_template.template?.mappings?._source?.excludes ?? [], } as IndexTemplateInfo; }) ) .catch((error) => { this.logger.warn('Error fetching index templates', { error_message: error } as LogMeta); throw error; }); } public async *getIlmsPolicies(ilms: string[], chunkSize: number) { const es = this.esClient(); const safeChunkSize = Math.min(chunkSize, 3000); const phase = (obj: unknown): Nullable<IlmPhase> => { let value: Nullable<IlmPhase>; if (obj !== null && obj !== undefined && typeof obj === 'object' && 'min_age' in obj) { value = { min_age: obj.min_age, } as IlmPhase; } return value; }; const groupedIlms = chunkStringsByMaxLength(ilms, safeChunkSize); this.logger.l('Splitted ilms into groups', { groups: groupedIlms.length, ilms: ilms.length, } as LogMeta); for (const group of groupedIlms) { this.logger.l('Fetching ilm policies'); const request: IlmGetLifecycleRequest = { name: group.join(','), filter_path: [ '*.policy.phases.cold.min_age', '*.policy.phases.delete.min_age', '*.policy.phases.frozen.min_age', '*.policy.phases.hot.min_age', '*.policy.phases.warm.min_age', '*.modified_date', ], }; const response = await es.ilm.getLifecycle(request); try { for (const [policyName, stats] of Object.entries(response ?? {})) { yield { policy_name: policyName, modified_date: stats.modified_date, phases: { cold: phase(stats.policy.phases.cold), delete: phase(stats.policy.phases.delete), frozen: phase(stats.policy.phases.frozen), hot: phase(stats.policy.phases.hot), warm: phase(stats.policy.phases.warm), } as IlmPhases, } as IlmPolicy; } } catch (error) { this.logger.warn('Error fetching ilm policies', { error_message: error.message, } as LogMeta); throw error; } } } public async getIngestPipelinesStats(timeout: Duration): Promise<NodeIngestPipelinesStats[]> { const es = this.esClient(); this.logger.l('Fetching ingest pipelines stats'); const request: NodesStatsRequest = { metric: 'ingest', filter_path: [ 'nodes.*.ingest.total', 'nodes.*.ingest.pipelines.*.count', 'nodes.*.ingest.pipelines.*.time_in_millis', 'nodes.*.ingest.pipelines.*.failed', 'nodes.*.ingest.pipelines.*.current', 'nodes.*.ingest.pipelines.*.processors.*.stats.count', 'nodes.*.ingest.pipelines.*.processors.*.stats.time_in_millis', 'nodes.*.ingest.pipelines.*.processors.*.stats.failed', 'nodes.*.ingest.pipelines.*.processors.*.stats.current', ], timeout, }; return es.nodes .stats(request) .then((response) => { return Object.entries(response.nodes).map(([nodeName, node]) => { return { name: nodeName, totals: { count: node.ingest?.total?.count ?? 0, time_in_millis: node.ingest?.total?.time_in_millis ?? 0, current: node.ingest?.total?.current ?? 0, failed: node.ingest?.total?.failed ?? 0, } as Totals, pipelines: Object.entries(node.ingest?.pipelines ?? []).map( ([pipelineName, pipeline]) => { return { name: pipelineName, totals: { count: pipeline.count, time_in_millis: pipeline.time_in_millis, current: pipeline.current, failed: pipeline.failed, } as Totals, processors: (pipeline.processors ?? []) .map((processors) => { return Object.entries(processors).map(([processorName, processor]) => { return { name: processorName, totals: { count: processor.stats?.count ?? 0, time_in_millis: processor.stats?.time_in_millis ?? 0, current: processor.stats?.current ?? 0, failed: processor.stats?.failed ?? 0, } as Totals, } as Processor; }); }) .flat(), } as Pipeline; } ), } as NodeIngestPipelinesStats; }); }) .catch((error) => { this.logger.warn('Error fetching ingest pipelines stats', { error_message: error, } as LogMeta); throw error; }); } }