in packages/angular_devkit/core/src/experimental/jobs/simple-scheduler.ts [286:484]
private _createJob<A extends MinimumArgumentT, I extends MinimumInputT, O extends MinimumOutputT>(
name: JobName,
argument: A,
handler: Observable<JobHandlerWithExtra | null>,
inboundBus: Observer<JobInboundMessage<I>>,
outboundBus: Observable<JobOutboundMessage<O>>,
): Job<A, I, O> {
const schemaRegistry = this._schemaRegistry;
const channelsSubject = new Map<string, Subject<JsonValue>>();
const channels = new Map<string, Observable<JsonValue>>();
let state = JobState.Queued;
let pingId = 0;
// Create the input channel by having a filter.
const input = new Subject<JsonValue>();
input
.pipe(
concatMap((message) =>
handler.pipe(
switchMap((handler) => {
if (handler === null) {
throw new JobDoesNotExistException(name);
} else {
return handler.inputV.pipe(switchMap((validate) => validate(message)));
}
}),
),
),
filter((result) => result.success),
map((result) => result.data as I),
)
.subscribe((value) => inboundBus.next({ kind: JobInboundMessageKind.Input, value }));
outboundBus = concat(
outboundBus,
// Add an End message at completion. This will be filtered out if the job actually send an
// End.
handler.pipe(
switchMap((handler) => {
if (handler) {
return of<JobOutboundMessage<O>>({
kind: JobOutboundMessageKind.End,
description: handler.jobDescription,
});
} else {
return EMPTY as Observable<JobOutboundMessage<O>>;
}
}),
),
).pipe(
filter((message) => this._filterJobOutboundMessages(message, state)),
// Update internal logic and Job<> members.
tap(
(message) => {
// Update the state.
state = this._updateState(message, state);
switch (message.kind) {
case JobOutboundMessageKind.ChannelCreate: {
const maybeSubject = channelsSubject.get(message.name);
// If it doesn't exist or it's closed on the other end.
if (!maybeSubject) {
const s = new Subject<JsonValue>();
channelsSubject.set(message.name, s);
channels.set(message.name, s.asObservable());
}
break;
}
case JobOutboundMessageKind.ChannelMessage: {
const maybeSubject = channelsSubject.get(message.name);
if (maybeSubject) {
maybeSubject.next(message.message);
}
break;
}
case JobOutboundMessageKind.ChannelComplete: {
const maybeSubject = channelsSubject.get(message.name);
if (maybeSubject) {
maybeSubject.complete();
channelsSubject.delete(message.name);
}
break;
}
case JobOutboundMessageKind.ChannelError: {
const maybeSubject = channelsSubject.get(message.name);
if (maybeSubject) {
maybeSubject.error(message.error);
channelsSubject.delete(message.name);
}
break;
}
}
},
() => {
state = JobState.Errored;
},
),
// Do output validation (might include default values so this might have side
// effects). We keep all messages in order.
concatMap((message) => {
if (message.kind !== JobOutboundMessageKind.Output) {
return of(message);
}
return handler.pipe(
switchMap((handler) => {
if (handler === null) {
throw new JobDoesNotExistException(name);
} else {
return handler.outputV.pipe(
switchMap((validate) => validate(message.value)),
switchMap((output) => {
if (!output.success) {
throw new JobOutputSchemaValidationError(output.errors);
}
return of({
...message,
output: output.data as O,
} as JobOutboundMessageOutput<O>);
}),
);
}
}),
) as Observable<JobOutboundMessage<O>>;
}),
_jobShare(),
);
const output = outboundBus.pipe(
filter((x) => x.kind == JobOutboundMessageKind.Output),
map((x) => (x as JobOutboundMessageOutput<O>).value),
shareReplay(1),
);
// Return the Job.
return {
get state() {
return state;
},
argument,
description: handler.pipe(
switchMap((handler) => {
if (handler === null) {
throw new JobDoesNotExistException(name);
} else {
return of(handler.jobDescription);
}
}),
),
output,
getChannel<T extends JsonValue>(
name: JobName,
schema: schema.JsonSchema = true,
): Observable<T> {
let maybeObservable = channels.get(name);
if (!maybeObservable) {
const s = new Subject<T>();
channelsSubject.set(name, (s as unknown) as Subject<JsonValue>);
channels.set(name, s.asObservable());
maybeObservable = s.asObservable();
}
return maybeObservable.pipe(
// Keep the order of messages.
concatMap((message) => {
return schemaRegistry.compile(schema).pipe(
switchMap((validate) => validate(message)),
filter((x) => x.success),
map((x) => x.data as T),
);
}),
);
},
ping() {
const id = pingId++;
inboundBus.next({ kind: JobInboundMessageKind.Ping, id });
return outboundBus.pipe(
filter((x) => x.kind === JobOutboundMessageKind.Pong && x.id == id),
first(),
ignoreElements(),
);
},
stop() {
inboundBus.next({ kind: JobInboundMessageKind.Stop });
},
input,
inboundBus,
outboundBus,
};
}