private _createJob()

in packages/angular_devkit/core/src/experimental/jobs/simple-scheduler.ts [286:484]


  private _createJob<A extends MinimumArgumentT, I extends MinimumInputT, O extends MinimumOutputT>(
    name: JobName,
    argument: A,
    handler: Observable<JobHandlerWithExtra | null>,
    inboundBus: Observer<JobInboundMessage<I>>,
    outboundBus: Observable<JobOutboundMessage<O>>,
  ): Job<A, I, O> {
    const schemaRegistry = this._schemaRegistry;

    const channelsSubject = new Map<string, Subject<JsonValue>>();
    const channels = new Map<string, Observable<JsonValue>>();

    let state = JobState.Queued;
    let pingId = 0;

    // Create the input channel by having a filter.
    const input = new Subject<JsonValue>();
    input
      .pipe(
        concatMap((message) =>
          handler.pipe(
            switchMap((handler) => {
              if (handler === null) {
                throw new JobDoesNotExistException(name);
              } else {
                return handler.inputV.pipe(switchMap((validate) => validate(message)));
              }
            }),
          ),
        ),
        filter((result) => result.success),
        map((result) => result.data as I),
      )
      .subscribe((value) => inboundBus.next({ kind: JobInboundMessageKind.Input, value }));

    outboundBus = concat(
      outboundBus,
      // Add an End message at completion. This will be filtered out if the job actually send an
      // End.
      handler.pipe(
        switchMap((handler) => {
          if (handler) {
            return of<JobOutboundMessage<O>>({
              kind: JobOutboundMessageKind.End,
              description: handler.jobDescription,
            });
          } else {
            return EMPTY as Observable<JobOutboundMessage<O>>;
          }
        }),
      ),
    ).pipe(
      filter((message) => this._filterJobOutboundMessages(message, state)),
      // Update internal logic and Job<> members.
      tap(
        (message) => {
          // Update the state.
          state = this._updateState(message, state);

          switch (message.kind) {
            case JobOutboundMessageKind.ChannelCreate: {
              const maybeSubject = channelsSubject.get(message.name);
              // If it doesn't exist or it's closed on the other end.
              if (!maybeSubject) {
                const s = new Subject<JsonValue>();
                channelsSubject.set(message.name, s);
                channels.set(message.name, s.asObservable());
              }
              break;
            }

            case JobOutboundMessageKind.ChannelMessage: {
              const maybeSubject = channelsSubject.get(message.name);
              if (maybeSubject) {
                maybeSubject.next(message.message);
              }
              break;
            }

            case JobOutboundMessageKind.ChannelComplete: {
              const maybeSubject = channelsSubject.get(message.name);
              if (maybeSubject) {
                maybeSubject.complete();
                channelsSubject.delete(message.name);
              }
              break;
            }

            case JobOutboundMessageKind.ChannelError: {
              const maybeSubject = channelsSubject.get(message.name);
              if (maybeSubject) {
                maybeSubject.error(message.error);
                channelsSubject.delete(message.name);
              }
              break;
            }
          }
        },
        () => {
          state = JobState.Errored;
        },
      ),

      // Do output validation (might include default values so this might have side
      // effects). We keep all messages in order.
      concatMap((message) => {
        if (message.kind !== JobOutboundMessageKind.Output) {
          return of(message);
        }

        return handler.pipe(
          switchMap((handler) => {
            if (handler === null) {
              throw new JobDoesNotExistException(name);
            } else {
              return handler.outputV.pipe(
                switchMap((validate) => validate(message.value)),
                switchMap((output) => {
                  if (!output.success) {
                    throw new JobOutputSchemaValidationError(output.errors);
                  }

                  return of({
                    ...message,
                    output: output.data as O,
                  } as JobOutboundMessageOutput<O>);
                }),
              );
            }
          }),
        ) as Observable<JobOutboundMessage<O>>;
      }),
      _jobShare(),
    );

    const output = outboundBus.pipe(
      filter((x) => x.kind == JobOutboundMessageKind.Output),
      map((x) => (x as JobOutboundMessageOutput<O>).value),
      shareReplay(1),
    );

    // Return the Job.
    return {
      get state() {
        return state;
      },
      argument,
      description: handler.pipe(
        switchMap((handler) => {
          if (handler === null) {
            throw new JobDoesNotExistException(name);
          } else {
            return of(handler.jobDescription);
          }
        }),
      ),
      output,
      getChannel<T extends JsonValue>(
        name: JobName,
        schema: schema.JsonSchema = true,
      ): Observable<T> {
        let maybeObservable = channels.get(name);
        if (!maybeObservable) {
          const s = new Subject<T>();
          channelsSubject.set(name, (s as unknown) as Subject<JsonValue>);
          channels.set(name, s.asObservable());

          maybeObservable = s.asObservable();
        }

        return maybeObservable.pipe(
          // Keep the order of messages.
          concatMap((message) => {
            return schemaRegistry.compile(schema).pipe(
              switchMap((validate) => validate(message)),
              filter((x) => x.success),
              map((x) => x.data as T),
            );
          }),
        );
      },
      ping() {
        const id = pingId++;
        inboundBus.next({ kind: JobInboundMessageKind.Ping, id });

        return outboundBus.pipe(
          filter((x) => x.kind === JobOutboundMessageKind.Pong && x.id == id),
          first(),
          ignoreElements(),
        );
      },
      stop() {
        inboundBus.next({ kind: JobInboundMessageKind.Stop });
      },
      input,
      inboundBus,
      outboundBus,
    };
  }