x-pack/platform/plugins/shared/fleet/server/services/output.ts (1,081 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { v5 as uuidv5 } from 'uuid';
import { omit } from 'lodash';
import { load } from 'js-yaml';
import deepEqual from 'fast-deep-equal';
import { indexBy } from 'lodash/fp';
import type {
ElasticsearchClient,
KibanaRequest,
SavedObject,
SavedObjectsClientContract,
} from '@kbn/core/server';
import { SavedObjectsUtils } from '@kbn/core/server';
import _ from 'lodash';
import pMap from 'p-map';
import {
getDefaultPresetForEsOutput,
outputTypeSupportPresets,
outputYmlIncludesReservedPerformanceKey,
} from '../../common/services/output_helpers';
import type {
NewOutput,
Output,
OutputSOAttributes,
AgentPolicy,
OutputSoKafkaAttributes,
OutputSoRemoteElasticsearchAttributes,
PolicySecretReference,
} from '../types';
import {
LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE,
PACKAGE_POLICY_SAVED_OBJECT_TYPE,
DEFAULT_OUTPUT,
DEFAULT_OUTPUT_ID,
OUTPUT_SAVED_OBJECT_TYPE,
OUTPUT_HEALTH_DATA_STREAM,
MAX_CONCURRENT_BACKFILL_OUTPUTS_PRESETS,
} from '../constants';
import {
SO_SEARCH_LIMIT,
outputType,
kafkaSaslMechanism,
kafkaPartitionType,
kafkaCompressionType,
kafkaAcknowledgeReliabilityLevel,
RESERVED_CONFIG_YML_KEYS,
FLEET_APM_PACKAGE,
FLEET_SYNTHETICS_PACKAGE,
FLEET_SERVER_PACKAGE,
} from '../../common/constants';
import type { ValueOf } from '../../common/types';
import { normalizeHostsForAgents } from '../../common/services';
import {
FleetEncryptedSavedObjectEncryptionKeyRequired,
OutputInvalidError,
OutputUnauthorizedError,
FleetError,
} from '../errors';
import type { OutputType } from '../types';
import { agentPolicyService } from './agent_policy';
import { packagePolicyService } from './package_policy';
import { appContextService } from './app_context';
import { escapeSearchQueryPhrase } from './saved_object';
import { auditLoggingService } from './audit_logging';
import {
deleteOutputSecrets,
deleteSecrets,
extractAndUpdateOutputSecrets,
extractAndWriteOutputSecrets,
isOutputSecretStorageEnabled,
} from './secrets';
import { findAgentlessPolicies } from './outputs/helpers';
import { patchUpdateDataWithRequireEncryptedAADFields } from './outputs/so_helpers';
type Nullable<T> = { [P in keyof T]: T[P] | null };
const SAVED_OBJECT_TYPE = OUTPUT_SAVED_OBJECT_TYPE;
const DEFAULT_ES_HOSTS = ['http://localhost:9200'];
const fakeRequest = {
headers: {},
getBasePath: () => '',
path: '/',
route: { settings: {} },
url: {
href: '/',
},
raw: {
req: {
url: '/',
},
},
} as unknown as KibanaRequest;
// differentiate
function isUUID(val: string) {
return (
typeof val === 'string' &&
val.match(/[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}/)
);
}
export function outputIdToUuid(id: string) {
if (isUUID(id)) {
return id;
}
// UUID v5 need a namespace (uuid.DNS), changing this params will result in loosing the ability to generate predicable uuid
return uuidv5(id, uuidv5.DNS);
}
export function outputSavedObjectToOutput(so: SavedObject<OutputSOAttributes>): Output {
const logger = appContextService.getLogger();
const { output_id: outputId, ssl, proxy_id: proxyId, ...attributes } = so.attributes;
let parsedSsl;
try {
parsedSsl = typeof ssl === 'string' ? JSON.parse(ssl) : undefined;
} catch (e) {
logger.warn(`Unable to parse ssl for output ${so.id}: ${e.message}`);
logger.warn(`ssl value: ${ssl}`);
}
return {
id: outputId ?? so.id,
...attributes,
...(parsedSsl ? { ssl: parsedSsl } : {}),
...(proxyId ? { proxy_id: proxyId } : {}),
};
}
async function getAgentPoliciesPerOutput(outputId?: string, isDefault?: boolean) {
const internalSoClientWithoutSpaceExtension =
appContextService.getInternalUserSOClientWithoutSpaceExtension();
let agentPoliciesKuery: string;
const packagePoliciesKuery: string = `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.output_id:"${outputId}"`;
if (outputId) {
if (isDefault) {
agentPoliciesKuery = `${LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE}.data_output_id:"${outputId}" or not ${LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE}.data_output_id:*`;
} else {
agentPoliciesKuery = `${LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE}.data_output_id:"${outputId}"`;
}
} else {
if (isDefault) {
agentPoliciesKuery = `not ${LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE}.data_output_id:*`;
} else {
return;
}
}
// Get agent policies directly using output
const directAgentPolicies = await agentPolicyService.list(internalSoClientWithoutSpaceExtension, {
kuery: agentPoliciesKuery,
perPage: SO_SEARCH_LIMIT,
spaceId: '*',
});
const directAgentPolicyIds = directAgentPolicies?.items.map((policy) => policy.id);
// Get package policies using output and derive agent policies from that which
// are not already identfied above. The IDs cannot be used as part of the kuery
// above since the underlying saved object client .find() only filters on attributes
const packagePolicySOs = await packagePolicyService.list(internalSoClientWithoutSpaceExtension, {
kuery: packagePoliciesKuery,
perPage: SO_SEARCH_LIMIT,
spaceId: '*',
});
const agentPolicyIdsFromPackagePolicies = [
...new Set(
packagePolicySOs?.items.reduce((acc: string[], packagePolicy) => {
return [
...acc,
...packagePolicy.policy_ids.filter((id) => !directAgentPolicyIds?.includes(id)),
];
}, [])
),
];
const agentPoliciesFromPackagePolicies = await agentPolicyService.getByIds(
internalSoClientWithoutSpaceExtension,
agentPolicyIdsFromPackagePolicies.map((id) => ({ id, spaceId: '*' }))
);
const agentPoliciesIndexedById = indexBy(
(policy) => policy.id,
[...directAgentPolicies.items, ...agentPoliciesFromPackagePolicies]
);
// Bulk fetch package policies with only needed fields
if (Object.keys(agentPoliciesIndexedById).length) {
const { items: packagePolicies } = await packagePolicyService.list(
internalSoClientWithoutSpaceExtension,
{
fields: ['policy_ids', 'package.name'],
kuery: [FLEET_APM_PACKAGE, FLEET_SYNTHETICS_PACKAGE, FLEET_SERVER_PACKAGE]
.map((packageName) => `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:${packageName}`)
.join(' or '),
}
);
for (const packagePolicy of packagePolicies) {
for (const policyId of packagePolicy.policy_ids) {
if (agentPoliciesIndexedById[policyId]) {
if (!agentPoliciesIndexedById[policyId].package_policies) {
agentPoliciesIndexedById[policyId].package_policies = [];
}
agentPoliciesIndexedById[policyId].package_policies?.push(packagePolicy);
}
}
}
}
return Object.values(agentPoliciesIndexedById);
}
async function validateLogstashOutputNotUsedInAPMPolicy(outputId?: string, isDefault?: boolean) {
const agentPolicies = await getAgentPoliciesPerOutput(outputId, isDefault);
// Validate no policy with APM use that policy
if (agentPolicies) {
for (const agentPolicy of agentPolicies) {
if (agentPolicyService.hasAPMIntegration(agentPolicy)) {
throw new OutputInvalidError('Logstash output cannot be used with APM integration.');
}
}
}
}
async function findPoliciesWithFleetServerOrSynthetics(outputId?: string, isDefault?: boolean) {
const internalSoClientWithoutSpaceExtension =
appContextService.getInternalUserSOClientWithoutSpaceExtension();
let agentPolicies: AgentPolicy[] | undefined;
if (outputId) {
agentPolicies = await getAgentPoliciesPerOutput(outputId, isDefault);
} else {
const { items: packagePolicies } = await packagePolicyService.list(
internalSoClientWithoutSpaceExtension,
{
fields: ['policy_ids', 'package.name'],
spaceId: '*',
kuery: [FLEET_APM_PACKAGE, FLEET_SYNTHETICS_PACKAGE, FLEET_SERVER_PACKAGE]
.map((packageName) => `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:${packageName}`)
.join(' or '),
}
);
const agentPolicyIds = _.uniq(packagePolicies.flatMap((p) => p.policy_ids));
if (agentPolicyIds.length) {
agentPolicies = await agentPolicyService.getByIds(
internalSoClientWithoutSpaceExtension,
agentPolicyIds.map((id) => ({ id, spaceId: '*' }))
);
for (const packagePolicy of packagePolicies) {
for (const policyId of packagePolicy.policy_ids) {
const agentPolicy = agentPolicies.find((p) => p.id === policyId);
if (agentPolicy) {
if (!agentPolicy.package_policies) {
agentPolicy.package_policies = [];
}
agentPolicy.package_policies?.push(packagePolicy);
}
}
}
}
}
const policiesWithFleetServer =
agentPolicies?.filter((policy) => agentPolicyService.hasFleetServerIntegration(policy)) || [];
const policiesWithSynthetics =
agentPolicies?.filter((policy) => agentPolicyService.hasSyntheticsIntegration(policy)) || [];
return { policiesWithFleetServer, policiesWithSynthetics };
}
function validateOutputNotUsedInPolicy(
agentPolicies: AgentPolicy[],
dataOutputType: ValueOf<OutputType>,
integrationName: string
) {
// Validate no policy with this integration uses that output
for (const agentPolicy of agentPolicies) {
throw new OutputInvalidError(
`${_.capitalize(
dataOutputType
)} output cannot be used with ${integrationName} integration in ${
agentPolicy.name
}. Please create a new Elasticsearch output.`
);
}
}
async function validateTypeChanges(
esClient: ElasticsearchClient,
id: string,
data: Nullable<Partial<OutputSOAttributes>>,
originalOutput: Output,
defaultDataOutputId: string | null,
fromPreconfiguration: boolean
) {
const internalSoClientWithoutSpaceExtension =
appContextService.getInternalUserSOClientWithoutSpaceExtension();
const mergedIsDefault = data.is_default ?? originalOutput.is_default;
const { policiesWithFleetServer, policiesWithSynthetics } =
await findPoliciesWithFleetServerOrSynthetics(id, mergedIsDefault);
const agentlessPolicies = await findAgentlessPolicies(id);
if (data.type === outputType.Logstash || originalOutput.type === outputType.Logstash) {
await validateLogstashOutputNotUsedInAPMPolicy(id, mergedIsDefault);
}
// prevent changing an ES output to a non-local ES output if it's used by an invalid policy
if (
originalOutput.type === outputType.Elasticsearch &&
data?.type !== outputType.Elasticsearch &&
data.type
) {
// Validate no policy with fleet server, synthetics, or agentless policies use that output
validateOutputNotUsedInPolicy(policiesWithFleetServer, data.type, 'Fleet Server');
validateOutputNotUsedInPolicy(policiesWithSynthetics, data.type, 'Synthetics');
validateOutputNotUsedInPolicy(agentlessPolicies, data.type, 'agentless');
}
await updateAgentPoliciesDataOutputId(
internalSoClientWithoutSpaceExtension,
esClient,
data,
mergedIsDefault,
defaultDataOutputId,
_.uniq([...policiesWithFleetServer, ...policiesWithSynthetics, ...agentlessPolicies]),
fromPreconfiguration
);
}
async function updateAgentPoliciesDataOutputId(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
data: Nullable<Partial<OutputSOAttributes>>,
isDefault: boolean,
defaultDataOutputId: string | null,
agentPolicies: AgentPolicy[],
fromPreconfiguration: boolean
) {
// if a non-local ES output is about to be updated to become default
// and fleet server, synthetics, or agentless policies don't have
// data_output_id set, update them to use the current default output ID
if (data?.type !== outputType.Elasticsearch && isDefault) {
for (const policy of agentPolicies) {
if (!policy.data_output_id) {
await agentPolicyService.update(
soClient,
esClient,
policy.id,
{
data_output_id: defaultDataOutputId,
},
{ force: fromPreconfiguration }
);
}
}
}
}
class OutputService {
private get encryptedSoClient() {
return appContextService.getInternalUserSOClient(fakeRequest);
}
private async _getDefaultDataOutputsSO() {
const outputs = await this.encryptedSoClient.find<OutputSOAttributes>({
type: OUTPUT_SAVED_OBJECT_TYPE,
searchFields: ['is_default'],
search: 'true',
});
for (const output of outputs.saved_objects) {
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: output.id,
name: output.attributes.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
}
return outputs;
}
private async _getDefaultMonitoringOutputsSO(soClient: SavedObjectsClientContract) {
const outputs = await this.encryptedSoClient.find<OutputSOAttributes>({
type: OUTPUT_SAVED_OBJECT_TYPE,
searchFields: ['is_default_monitoring'],
search: 'true',
});
for (const output of outputs.saved_objects) {
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: output.id,
name: output.attributes.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
}
return outputs;
}
private async _updateDefaultOutput(
soClient: SavedObjectsClientContract,
defaultDataOutputId: string,
updateData: { is_default: boolean } | { is_default_monitoring: boolean },
fromPreconfiguration: boolean
) {
const originalOutput = await this.get(soClient, defaultDataOutputId);
this._validateFieldsAreEditable(
originalOutput,
updateData,
defaultDataOutputId,
fromPreconfiguration
);
auditLoggingService.writeCustomSoAuditLog({
action: 'update',
id: outputIdToUuid(defaultDataOutputId),
name: originalOutput.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
return await this.encryptedSoClient.update<Nullable<OutputSOAttributes>>(
SAVED_OBJECT_TYPE,
outputIdToUuid(defaultDataOutputId),
updateData
);
}
private _validateFieldsAreEditable(
originalOutput: Output,
data: Partial<Output>,
id: string,
fromPreconfiguration: boolean
) {
if (originalOutput.is_preconfigured) {
if (!fromPreconfiguration) {
const allowEditFields = originalOutput.allow_edit ?? [];
const allKeys = Array.from(new Set([...Object.keys(data)])) as Array<keyof Output>;
for (const key of allKeys) {
if (
(!!originalOutput[key] || !!data[key]) &&
!allowEditFields.includes(key) &&
!deepEqual(originalOutput[key], data[key])
) {
throw new OutputUnauthorizedError(
`Preconfigured output ${id} ${key} cannot be updated outside of kibana config file.`
);
}
}
}
}
}
public async ensureDefaultOutput(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient
) {
const outputs = await this.list(soClient);
const defaultOutput = outputs.items.find((o) => o.is_default);
const defaultMonitoringOutput = outputs.items.find((o) => o.is_default_monitoring);
if (!defaultOutput) {
const newDefaultOutput = {
...DEFAULT_OUTPUT,
hosts: this.getDefaultESHosts(),
ca_sha256: appContextService.getConfig()!.agents.elasticsearch.ca_sha256,
is_default_monitoring: !defaultMonitoringOutput,
} as NewOutput;
return await this.create(soClient, esClient, newDefaultOutput, {
id: DEFAULT_OUTPUT_ID,
overwrite: true,
});
}
return defaultOutput;
}
public getDefaultESHosts(): string[] {
const cloud = appContextService.getCloud();
const cloudUrl = cloud?.elasticsearchUrl;
const cloudHosts = cloudUrl ? [cloudUrl] : undefined;
const flagHosts =
appContextService.getConfig()!.agents?.elasticsearch?.hosts &&
appContextService.getConfig()!.agents.elasticsearch.hosts?.length
? appContextService.getConfig()!.agents.elasticsearch.hosts
: undefined;
return cloudHosts || flagHosts || DEFAULT_ES_HOSTS;
}
public async getDefaultDataOutputId(soClient: SavedObjectsClientContract) {
const outputs = await this._getDefaultDataOutputsSO();
if (!outputs.saved_objects.length) {
return null;
}
return outputSavedObjectToOutput(outputs.saved_objects[0]).id;
}
public async getDefaultMonitoringOutputId(soClient: SavedObjectsClientContract) {
const outputs = await this._getDefaultMonitoringOutputsSO(soClient);
if (!outputs.saved_objects.length) {
return null;
}
return outputSavedObjectToOutput(outputs.saved_objects[0]).id;
}
public async create(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
output: NewOutput,
options?: {
id?: string;
fromPreconfiguration?: boolean;
overwrite?: boolean;
secretHashes?: Record<string, any>;
}
): Promise<Output> {
const logger = appContextService.getLogger();
logger.debug(`Creating new output`);
const data: OutputSOAttributes = { ...omit(output, ['ssl', 'secrets']) };
if (outputTypeSupportPresets(data.type)) {
if (
data.preset === 'balanced' &&
outputYmlIncludesReservedPerformanceKey(output.config_yaml ?? '', load)
) {
throw new OutputInvalidError(
`preset cannot be balanced when config_yaml contains one of ${RESERVED_CONFIG_YML_KEYS.join(
', '
)}`
);
}
}
const defaultDataOutputId = await this.getDefaultDataOutputId(soClient);
if (output.type === outputType.Logstash) {
await validateLogstashOutputNotUsedInAPMPolicy(undefined, data.is_default);
}
if (!appContextService.getEncryptedSavedObjectsSetup()?.canEncrypt) {
throw new FleetEncryptedSavedObjectEncryptionKeyRequired(
`${output.type} output needs encrypted saved object api key to be set`
);
}
const { policiesWithFleetServer, policiesWithSynthetics } =
await findPoliciesWithFleetServerOrSynthetics();
const agentlessPolicies = await findAgentlessPolicies();
await updateAgentPoliciesDataOutputId(
soClient,
esClient,
data,
data.is_default,
defaultDataOutputId,
_.uniq([...policiesWithFleetServer, ...policiesWithSynthetics, ...agentlessPolicies]),
options?.fromPreconfiguration ?? false
);
// ensure only default output exists
if (data.is_default) {
if (defaultDataOutputId && defaultDataOutputId !== options?.id) {
await this._updateDefaultOutput(
soClient,
defaultDataOutputId,
{ is_default: false },
options?.fromPreconfiguration ?? false
);
}
}
if (data.is_default_monitoring) {
const defaultMonitoringOutputId = await this.getDefaultMonitoringOutputId(soClient);
if (defaultMonitoringOutputId && defaultMonitoringOutputId !== options?.id) {
await this._updateDefaultOutput(
soClient,
defaultMonitoringOutputId,
{ is_default_monitoring: false },
options?.fromPreconfiguration ?? false
);
}
}
if (
(data.type === outputType.Elasticsearch || data.type === outputType.RemoteElasticsearch) &&
data.hosts
) {
data.hosts = data.hosts.map(normalizeHostsForAgents);
}
if (options?.id) {
data.output_id = options?.id;
}
if (output.ssl) {
data.ssl = JSON.stringify(output.ssl);
}
// Remove the shipper data if the shipper is not enabled from the yaml config
if (!output.config_yaml && output.shipper) {
data.shipper = null;
}
if (!data.preset && data.type === outputType.Elasticsearch) {
data.preset = getDefaultPresetForEsOutput(data.config_yaml ?? '', load);
}
if (output.config_yaml) {
const configJs = load(output.config_yaml);
const isShipperDisabled = !configJs?.shipper || configJs?.shipper?.enabled === false;
if (isShipperDisabled && output.shipper) {
data.shipper = null;
}
}
if (output.type === outputType.Kafka && data.type === outputType.Kafka) {
if (!output.version) {
data.version = '1.0.0';
}
if (!output.compression) {
data.compression = kafkaCompressionType.Gzip;
}
if (
!output.compression ||
(output.compression === kafkaCompressionType.Gzip && !output.compression_level)
) {
data.compression_level = 4;
}
if (!output.client_id) {
data.client_id = 'Elastic';
}
if (output.username && output.password && !output.sasl?.mechanism) {
data.sasl = {
mechanism: kafkaSaslMechanism.Plain,
};
}
if (!output.partition) {
data.partition = kafkaPartitionType.Hash;
}
if (output.partition === kafkaPartitionType.Random && !output.random?.group_events) {
data.random = {
group_events: 1,
};
}
if (output.partition === kafkaPartitionType.RoundRobin && !output.round_robin?.group_events) {
data.round_robin = {
group_events: 1,
};
}
if (!output.timeout) {
data.timeout = 30;
}
if (!output.broker_timeout) {
data.broker_timeout = 10;
}
if (output.required_acks === null || output.required_acks === undefined) {
// required_acks can be 0
data.required_acks = kafkaAcknowledgeReliabilityLevel.Commit;
}
}
const id = options?.id ? outputIdToUuid(options.id) : SavedObjectsUtils.generateId();
// Store secret values if enabled; if not, store plain text values
if (await isOutputSecretStorageEnabled(esClient, soClient)) {
const { output: outputWithSecrets } = await extractAndWriteOutputSecrets({
output,
esClient,
secretHashes: output.is_preconfigured ? options?.secretHashes : undefined,
});
if (outputWithSecrets.secrets) data.secrets = outputWithSecrets.secrets;
} else {
if (!output.ssl?.key && output.secrets?.ssl?.key) {
data.ssl = JSON.stringify({ ...output.ssl, ...output.secrets.ssl });
}
if (output.type === outputType.Kafka && data.type === outputType.Kafka) {
if (!output.password && output.secrets?.password) {
data.password = output.secrets?.password as string;
}
} else if (
output.type === outputType.RemoteElasticsearch &&
data.type === outputType.RemoteElasticsearch
) {
if (!output.service_token && output.secrets?.service_token) {
data.service_token = output.secrets?.service_token as string;
}
}
}
auditLoggingService.writeCustomSoAuditLog({
action: 'create',
id,
name: data.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
const newSo = await this.encryptedSoClient.create<OutputSOAttributes>(SAVED_OBJECT_TYPE, data, {
overwrite: options?.overwrite || options?.fromPreconfiguration,
id,
});
logger.debug(`Created new output ${id}`);
return outputSavedObjectToOutput(newSo);
}
public async bulkGet(ids: string[], { ignoreNotFound = false } = { ignoreNotFound: true }) {
const res = await this.encryptedSoClient.bulkGet<OutputSOAttributes>(
ids.map((id) => ({ id: outputIdToUuid(id), type: SAVED_OBJECT_TYPE }))
);
return res.saved_objects
.map((so) => {
if (so.error) {
if (!ignoreNotFound || so.error.statusCode !== 404) {
throw so.error;
}
return undefined;
}
return outputSavedObjectToOutput(so);
})
.filter((output): output is Output => typeof output !== 'undefined');
}
public async list(soClient: SavedObjectsClientContract) {
const outputs = await this.encryptedSoClient.find<OutputSOAttributes>({
type: SAVED_OBJECT_TYPE,
page: 1,
perPage: SO_SEARCH_LIMIT,
sortField: 'is_default',
sortOrder: 'desc',
});
for (const output of outputs.saved_objects) {
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: output.id,
name: output.attributes.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
}
return {
items: outputs.saved_objects.map<Output>(outputSavedObjectToOutput),
total: outputs.total,
page: outputs.page,
perPage: outputs.per_page,
};
}
public async listAllForProxyId(soClient: SavedObjectsClientContract, proxyId: string) {
const outputs = await this.encryptedSoClient.find<OutputSOAttributes>({
type: SAVED_OBJECT_TYPE,
page: 1,
perPage: SO_SEARCH_LIMIT,
searchFields: ['proxy_id'],
search: escapeSearchQueryPhrase(proxyId),
});
for (const output of outputs.saved_objects) {
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: output.id,
name: output.attributes.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
}
return {
items: outputs.saved_objects.map<Output>(outputSavedObjectToOutput),
total: outputs.total,
page: outputs.page,
perPage: outputs.per_page,
};
}
public async get(soClient: SavedObjectsClientContract, id: string): Promise<Output> {
const outputSO = await this.encryptedSoClient.get<OutputSOAttributes>(
SAVED_OBJECT_TYPE,
outputIdToUuid(id)
);
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: outputSO.id,
name: outputSO?.attributes?.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
if (outputSO.error) {
throw new FleetError(outputSO.error.message);
}
return outputSavedObjectToOutput(outputSO);
}
public async delete(
soClient: SavedObjectsClientContract,
id: string,
{ fromPreconfiguration = false }: { fromPreconfiguration?: boolean } = {
fromPreconfiguration: false,
}
) {
const logger = appContextService.getLogger();
logger.debug(`Deleting output ${id}`);
const originalOutput = await this.get(soClient, id);
if (originalOutput.is_preconfigured && !fromPreconfiguration) {
throw new OutputUnauthorizedError(
`Preconfigured output ${id} cannot be deleted outside of kibana config file.`
);
}
if (originalOutput.is_default && !fromPreconfiguration) {
throw new OutputUnauthorizedError(`Default output ${id} cannot be deleted.`);
}
if (originalOutput.is_default_monitoring && !fromPreconfiguration) {
throw new OutputUnauthorizedError(`Default monitoring output ${id} cannot be deleted.`);
}
await packagePolicyService.removeOutputFromAll(
appContextService.getInternalUserESClient(),
id,
{
force: fromPreconfiguration,
}
);
await agentPolicyService.removeOutputFromAll(appContextService.getInternalUserESClient(), id, {
force: fromPreconfiguration,
});
auditLoggingService.writeCustomSoAuditLog({
action: 'delete',
id: outputIdToUuid(id),
name: originalOutput.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
const soDeleteResult = this.encryptedSoClient.delete(SAVED_OBJECT_TYPE, outputIdToUuid(id));
await deleteOutputSecrets({
esClient: appContextService.getInternalUserESClient(),
output: originalOutput,
});
logger.debug(`Deleted output ${id}`);
return soDeleteResult;
}
public async update(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
id: string,
data: Partial<Output>,
{
fromPreconfiguration = false,
secretHashes,
}: { fromPreconfiguration: boolean; secretHashes?: Record<string, any> } = {
fromPreconfiguration: false,
}
) {
const logger = appContextService.getLogger();
logger.debug(`Updating output ${id}`);
let secretsToDelete: PolicySecretReference[] = [];
const originalOutput = await this.get(soClient, id);
this._validateFieldsAreEditable(originalOutput, data, id, fromPreconfiguration);
if (
(originalOutput.is_default && data.is_default === false) ||
(data.is_default_monitoring === false && originalOutput.is_default_monitoring)
) {
throw new OutputUnauthorizedError(
`Default output ${id} cannot be set to is_default=false or is_default_monitoring=false manually. Make another output the default first.`
);
}
const updateData: Nullable<Partial<OutputSOAttributes>> = { ...omit(data, ['ssl', 'secrets']) };
if (updateData.type && outputTypeSupportPresets(updateData.type)) {
if (
updateData.preset === 'balanced' &&
outputYmlIncludesReservedPerformanceKey(updateData.config_yaml ?? '', load)
) {
throw new OutputInvalidError(
`preset cannot be balanced when config_yaml contains one of ${RESERVED_CONFIG_YML_KEYS.join(
', '
)}`
);
}
}
const mergedType = data.type ?? originalOutput.type;
const mergedIsDefault = data.is_default ?? originalOutput.is_default;
const defaultDataOutputId = await this.getDefaultDataOutputId(soClient);
if (mergedType !== originalOutput.type || originalOutput.is_default !== mergedIsDefault) {
await validateTypeChanges(
esClient,
id,
updateData,
originalOutput,
defaultDataOutputId,
fromPreconfiguration
);
}
const removeKafkaFields = (target: Nullable<Partial<OutputSoKafkaAttributes>>) => {
target.version = null;
target.key = null;
target.compression = null;
target.compression_level = null;
target.connection_type = null;
target.client_id = null;
target.auth_type = null;
target.username = null;
target.password = null;
target.sasl = null;
target.partition = null;
target.random = null;
target.round_robin = null;
target.hash = null;
target.topic = null;
target.headers = null;
target.timeout = null;
target.broker_timeout = null;
target.required_acks = null;
target.ssl = null;
};
// If the output type changed
if (data.type && data.type !== originalOutput.type) {
if (data.type === outputType.Elasticsearch && updateData.type === outputType.Elasticsearch) {
updateData.preset = null;
}
if (data.type !== outputType.Kafka && originalOutput.type === outputType.Kafka) {
removeKafkaFields(updateData as Nullable<OutputSoKafkaAttributes>);
}
if (data.type === outputType.Logstash) {
// remove ES specific field
updateData.ca_trusted_fingerprint = null;
updateData.ca_sha256 = null;
delete (updateData as Nullable<OutputSoRemoteElasticsearchAttributes>).service_token;
delete (updateData as Nullable<OutputSoRemoteElasticsearchAttributes>).kibana_api_key;
}
if (data.type === outputType.Kafka && updateData.type === outputType.Kafka) {
updateData.ca_trusted_fingerprint = null;
updateData.ca_sha256 = null;
if (!data.version) {
updateData.version = '1.0.0';
}
if (!data.compression) {
updateData.compression = kafkaCompressionType.Gzip;
}
if (
!data.compression ||
(data.compression === kafkaCompressionType.Gzip && !data.compression_level)
) {
updateData.compression_level = 4;
}
if (data.compression && data.compression !== kafkaCompressionType.Gzip) {
// Clear compression level if compression is not gzip
updateData.compression_level = null;
}
if (!data.client_id) {
updateData.client_id = 'Elastic';
}
if (data.username && data.password && !data.sasl?.mechanism) {
updateData.sasl = {
mechanism: kafkaSaslMechanism.Plain,
};
}
if (!data.partition) {
updateData.partition = kafkaPartitionType.Hash;
}
if (data.partition === kafkaPartitionType.Random && !data.random?.group_events) {
updateData.random = {
group_events: 1,
};
}
if (data.partition === kafkaPartitionType.RoundRobin && !data.round_robin?.group_events) {
updateData.round_robin = {
group_events: 1,
};
}
if (!data.timeout) {
updateData.timeout = 30;
}
if (!data.broker_timeout) {
updateData.broker_timeout = 10;
}
if (updateData.required_acks === null || updateData.required_acks === undefined) {
// required_acks can be 0
updateData.required_acks = kafkaAcknowledgeReliabilityLevel.Commit;
}
}
}
if (data.ssl) {
updateData.ssl = JSON.stringify(data.ssl);
} else if (data.ssl === null) {
// Explicitly set to null to allow to delete the field
updateData.ssl = null;
}
if (data.type === outputType.Kafka && updateData.type === outputType.Kafka) {
if (!data.password) {
updateData.password = null;
}
if (!data.username) {
updateData.username = null;
}
if (!data.sasl) {
updateData.sasl = null;
}
if (!data.ssl) {
updateData.ssl = null;
}
}
// ensure only default output exists
if (data.is_default) {
if (defaultDataOutputId && defaultDataOutputId !== id) {
await this._updateDefaultOutput(
soClient,
defaultDataOutputId,
{ is_default: false },
fromPreconfiguration
);
}
}
if (data.is_default_monitoring) {
const defaultMonitoringOutputId = await this.getDefaultMonitoringOutputId(soClient);
if (defaultMonitoringOutputId && defaultMonitoringOutputId !== id) {
await this._updateDefaultOutput(
soClient,
defaultMonitoringOutputId,
{ is_default_monitoring: false },
fromPreconfiguration
);
}
}
if (
(mergedType === outputType.Elasticsearch || mergedType === outputType.RemoteElasticsearch) &&
updateData.hosts
) {
updateData.hosts = updateData.hosts.map(normalizeHostsForAgents);
}
if (
data.type === outputType.RemoteElasticsearch &&
updateData.type === outputType.RemoteElasticsearch
) {
if (!data.service_token) {
updateData.service_token = null;
}
if (!data.kibana_api_key) {
updateData.kibana_api_key = null;
}
}
if (!data.preset && data.type === outputType.Elasticsearch) {
updateData.preset = getDefaultPresetForEsOutput(data.config_yaml ?? '', load);
}
// Remove the shipper data if the shipper is not enabled from the yaml config
if (!data.config_yaml && data.shipper) {
updateData.shipper = null;
}
if (data.config_yaml) {
const configJs = load(data.config_yaml);
const isShipperDisabled = !configJs?.shipper || configJs?.shipper?.enabled === false;
if (isShipperDisabled && data.shipper) {
updateData.shipper = null;
}
}
// Store secret values if enabled; if not, store plain text values
if (await isOutputSecretStorageEnabled(esClient, soClient)) {
const secretsRes = await extractAndUpdateOutputSecrets({
oldOutput: originalOutput,
outputUpdate: data,
esClient,
secretHashes: data.is_preconfigured ? secretHashes : undefined,
});
updateData.secrets = secretsRes.outputUpdate.secrets;
secretsToDelete = secretsRes.secretsToDelete;
} else {
if (!data.ssl?.key && data.secrets?.ssl?.key) {
updateData.ssl = JSON.stringify({ ...data.ssl, ...data.secrets.ssl });
}
if (data.type === outputType.Kafka && updateData.type === outputType.Kafka) {
if (!data.password && data.secrets?.password) {
updateData.password = data.secrets?.password as string;
}
} else if (
data.type === outputType.RemoteElasticsearch &&
updateData.type === outputType.RemoteElasticsearch
) {
if (!data.service_token && data.secrets?.service_token) {
updateData.service_token = data.secrets?.service_token as string;
}
}
}
patchUpdateDataWithRequireEncryptedAADFields(updateData, originalOutput);
auditLoggingService.writeCustomSoAuditLog({
action: 'update',
id: outputIdToUuid(id),
name: originalOutput.name,
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
const outputSO = await this.encryptedSoClient.update<Nullable<OutputSOAttributes>>(
SAVED_OBJECT_TYPE,
outputIdToUuid(id),
updateData
);
if (outputSO.error) {
throw new FleetError(outputSO.error.message);
}
if (secretsToDelete.length) {
try {
await deleteSecrets({ esClient, ids: secretsToDelete.map((s) => s.id) });
} catch (err) {
logger.warn(`Error cleaning up secrets for output ${id}: ${err.message}`);
}
}
logger.debug(`Updated output ${id}`);
}
public async backfillAllOutputPresets(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient
) {
const outputs = await this.list(soClient);
await pMap(
outputs.items.filter((output) => outputTypeSupportPresets(output.type) && !output.preset),
async (output) => {
const preset = getDefaultPresetForEsOutput(output.config_yaml ?? '', load);
await outputService.update(
soClient,
esClient,
output.id,
{ preset },
{ fromPreconfiguration: true }
);
await agentPolicyService.bumpAllAgentPoliciesForOutput(esClient, output.id);
},
{
concurrency: MAX_CONCURRENT_BACKFILL_OUTPUTS_PRESETS,
}
);
}
async getLatestOutputHealth(esClient: ElasticsearchClient, id: string): Promise<OutputHealth> {
const lastUpdateTime = await this.getOutputLastUpdateTime(id);
const mustFilter = [];
if (lastUpdateTime) {
mustFilter.push({
range: {
'@timestamp': {
gte: lastUpdateTime,
},
},
});
}
const response = await esClient.search(
{
index: OUTPUT_HEALTH_DATA_STREAM,
query: { bool: { filter: { term: { output: id } }, must: mustFilter } },
sort: { '@timestamp': 'desc' },
size: 1,
},
{ ignore: [404] }
);
if (!response.hits || response.hits.hits.length === 0) {
return {
state: 'UNKNOWN',
message: '',
timestamp: '',
};
}
const latestHit = response.hits.hits[0]._source as any;
return {
state: latestHit.state,
message: latestHit.message ?? '',
timestamp: latestHit['@timestamp'],
};
}
async getOutputLastUpdateTime(id: string): Promise<string | undefined> {
const outputSO = await this.encryptedSoClient.get<OutputSOAttributes>(
SAVED_OBJECT_TYPE,
outputIdToUuid(id)
);
if (outputSO.error) {
appContextService
.getLogger()
.debug(
`Error getting output ${id} SO, using updated_at:undefined, cause: ${outputSO.error.message}`
);
return undefined;
}
return outputSO.updated_at;
}
}
interface OutputHealth {
state: string;
message: string;
timestamp: string;
}
export const outputService = new OutputService();