packages/@aws-cdk/toolkit-lib/lib/api/logs-monitor/logs-monitor.ts (152 lines of code) (raw):
import * as util from 'util';
import type * as cxapi from '@aws-cdk/cx-api';
import * as chalk from 'chalk';
import * as uuid from 'uuid';
import type { CloudWatchLogEvent } from '../../payloads/logs-monitor';
import { flatten } from '../../util';
import type { SDK } from '../aws-auth/private';
import { IO } from '../io/private';
import type { IoHelper } from '../io/private';
/**
* Configuration tracking information on the log groups that are
* being monitored
*/
interface LogGroupsAccessSettings {
/**
* The SDK for a given environment (account/region)
*/
readonly sdk: SDK;
/**
* A map of log groups and associated startTime in a given account.
*
* The monitor will read events from the log group starting at the
* associated startTime
*/
readonly logGroupsStartTimes: { [logGroupName: string]: number };
}
export interface CloudWatchLogEventMonitorProps {
/**
* The IoHost used for messaging
*/
readonly ioHelper: IoHelper;
/**
* The time from which we start reading log messages
*
* @default - now
*/
readonly startTime?: Date;
}
export class CloudWatchLogEventMonitor {
/**
* Determines which events not to display
*/
private startTime: number;
/**
* Map of environment (account:region) to LogGroupsAccessSettings
*/
private readonly envsLogGroupsAccessSettings = new Map<string, LogGroupsAccessSettings>();
/**
* After reading events from all CloudWatch log groups
* how long should we wait to read more events.
*
* If there is some error with reading events (i.e. Throttle)
* then this is also how long we wait until we try again
*/
private readonly pollingInterval: number = 2_000;
public monitorId?: string;
private readonly ioHelper: IoHelper;
constructor(props: CloudWatchLogEventMonitorProps) {
this.startTime = props.startTime?.getTime() ?? Date.now();
this.ioHelper = props.ioHelper;
}
/**
* resume reading/printing events
*/
public async activate(): Promise<void> {
this.monitorId = uuid.v4();
await this.ioHelper.notify(IO.CDK_TOOLKIT_I5032.msg('Start monitoring log groups', {
monitor: this.monitorId,
logGroupNames: this.logGroupNames(),
}));
await this.tick();
this.scheduleNextTick();
}
/**
* deactivates the monitor so no new events are read
* use case for this is when we are in the middle of performing a deployment
* and don't want to interweave all the logs together with the CFN
* deployment logs
*
* Also resets the start time to be when the new deployment was triggered
* and clears the list of tracked log groups
*/
public async deactivate(): Promise<void> {
const oldMonitorId = this.monitorId!;
this.monitorId = undefined;
this.startTime = Date.now();
await this.ioHelper.notify(IO.CDK_TOOLKIT_I5034.msg('Stopped monitoring log groups', {
monitor: oldMonitorId,
logGroupNames: this.logGroupNames(),
}));
this.envsLogGroupsAccessSettings.clear();
}
/**
* Adds CloudWatch log groups to read log events from.
* Since we could be watching multiple stacks that deploy to
* multiple environments (account+region), we need to store a list of log groups
* per env along with the SDK object that has access to read from
* that environment.
*/
public addLogGroups(env: cxapi.Environment, sdk: SDK, logGroupNames: string[]): void {
const awsEnv = `${env.account}:${env.region}`;
const logGroupsStartTimes = logGroupNames.reduce(
(acc, groupName) => {
acc[groupName] = this.startTime;
return acc;
},
{} as { [logGroupName: string]: number },
);
this.envsLogGroupsAccessSettings.set(awsEnv, {
sdk,
logGroupsStartTimes: {
...this.envsLogGroupsAccessSettings.get(awsEnv)?.logGroupsStartTimes,
...logGroupsStartTimes,
},
});
}
private logGroupNames(): string[] {
return Array.from(this.envsLogGroupsAccessSettings.values()).flatMap((settings) => Object.keys(settings.logGroupsStartTimes));
}
private scheduleNextTick(): void {
if (!this.monitorId) {
return;
}
setTimeout(() => void this.tick(), this.pollingInterval);
}
private async tick(): Promise<void> {
// excluding from codecoverage because this
// doesn't always run (depends on timing)
/* c8 ignore start */
if (!this.monitorId) {
return;
}
/* c8 ignore stop */
try {
const events = flatten(await this.readNewEvents());
for (const event of events) {
await this.print(event);
}
// We might have been stop()ped while the network call was in progress.
if (!this.monitorId) {
return;
}
} catch (e: any) {
await this.ioHelper.notify(IO.CDK_TOOLKIT_E5035.msg(`Error occurred while monitoring logs: ${String(e)}`, { error: e }));
}
this.scheduleNextTick();
}
/**
* Reads all new log events from a set of CloudWatch Log Groups
* in parallel
*/
private async readNewEvents(): Promise<Array<Array<CloudWatchLogEvent>>> {
const promises: Array<Promise<Array<CloudWatchLogEvent>>> = [];
for (const settings of this.envsLogGroupsAccessSettings.values()) {
for (const group of Object.keys(settings.logGroupsStartTimes)) {
promises.push(this.readEventsFromLogGroup(settings, group));
}
}
// Limited set of log groups
// eslint-disable-next-line @cdklabs/promiseall-no-unbounded-parallelism
return Promise.all(promises);
}
/**
* Print out a cloudwatch event
*/
private async print(event: CloudWatchLogEvent): Promise<void> {
await this.ioHelper.notify(IO.CDK_TOOLKIT_I5033.msg(
util.format(
'[%s] %s %s',
chalk.blue(event.logGroupName),
chalk.yellow(event.timestamp.toLocaleTimeString()),
event.message.trim(),
),
event,
));
}
/**
* Reads all new log events from a CloudWatch Log Group
* starting at either the time the hotswap was triggered or
* when the last event was read on the previous tick
*/
private async readEventsFromLogGroup(
logGroupsAccessSettings: LogGroupsAccessSettings,
logGroupName: string,
): Promise<Array<CloudWatchLogEvent>> {
const events: CloudWatchLogEvent[] = [];
// log events from some service are ingested faster than others
// so we need to track the start/end time for each log group individually
// to make sure that we process all events from each log group
const startTime = logGroupsAccessSettings.logGroupsStartTimes[logGroupName] ?? this.startTime;
let endTime = startTime;
try {
const response = await logGroupsAccessSettings.sdk.cloudWatchLogs().filterLogEvents({
logGroupName: logGroupName,
limit: 100,
startTime: startTime,
});
const filteredEvents = response.events ?? [];
for (const event of filteredEvents) {
if (event.message) {
events.push({
message: event.message,
logGroupName,
timestamp: event.timestamp ? new Date(event.timestamp) : new Date(),
});
if (event.timestamp && endTime < event.timestamp) {
endTime = event.timestamp;
}
}
}
// As long as there are _any_ events in the log group `filterLogEvents` will return a nextToken.
// This is true even if these events are before `startTime`. So if we have 100 events and a nextToken
// then assume that we have hit the limit and let the user know some messages have been suppressed.
// We are essentially showing them a sampling (10000 events printed out is not very useful)
if (filteredEvents.length === 100 && response.nextToken) {
events.push({
message: '>>> `watch` shows only the first 100 log messages - the rest have been truncated...',
logGroupName,
timestamp: new Date(endTime),
});
}
} catch (e: any) {
// with Lambda functions the CloudWatch is not created
// until something is logged, so just keep polling until
// there is somthing to find
if (e.name === 'ResourceNotFoundException') {
return [];
}
throw e;
}
logGroupsAccessSettings.logGroupsStartTimes[logGroupName] = endTime + 1;
return events;
}
}