request()

in packages/aws-appsync-subscription-link/src/subscription-handshake-link.ts [53:133]


    request(operation: Operation) {
        const {
            [this.subsInfoContextKey]: subsInfo,
            controlMessages: { [CONTROL_EVENTS_KEY]: controlEvents } = { [CONTROL_EVENTS_KEY]: undefined }
        } = operation.getContext();
        const {
            extensions: {
                subscription: { newSubscriptions, mqttConnections }
            } = { subscription: { newSubscriptions: {}, mqttConnections: [] } },
            errors = [],
        }: {
            extensions?: {
                subscription: SubscriptionExtension
            },
            errors: any[]
        } = subsInfo;

        if (errors && errors.length) {
            return new Observable(observer => {
                observer.error(new ApolloError({
                    errorMessage: 'Error during subscription handshake',
                    extraInfo: { errors },
                    graphQLErrors: errors
                }));

                return () => { };
            });
        }

        const newSubscriptionTopics = Object.keys(newSubscriptions).map(subKey => newSubscriptions[subKey].topic);
        const existingTopicsWithObserver = new Set(newSubscriptionTopics.filter(t => this.topicObservers.has(t)));
        const newTopics = new Set(newSubscriptionTopics.filter(t => !existingTopicsWithObserver.has(t)));

        return new Observable<FetchResult>(observer => {
            existingTopicsWithObserver.forEach(t => {
                this.topicObservers.get(t).add(observer);
                const anObserver = Array.from(this.topicObservers.get(t)).find(() => true);

                const [clientId] = Array.from(this.clientObservers).find(([, { observers }]) => observers.has(anObserver));
                this.clientObservers.get(clientId).observers.add(observer);
            });

            const newTopicsConnectionInfo = mqttConnections
                .filter(c => c.topics.some(t => newTopics.has(t)))
                .map(({ topics, ...rest }) => ({
                    ...rest,
                    topics: topics.filter(t => newTopics.has(t))
                } as MqttConnectionInfo));

            this.connectNewClients(newTopicsConnectionInfo, observer, operation);

            return () => {
                const clientsForCurrentObserver = Array.from(this.clientObservers).filter(([, { observers }]) => observers.has(observer));
                clientsForCurrentObserver.forEach(([clientId]) => this.clientObservers.get(clientId).observers.delete(observer));

                this.clientObservers.forEach(({ observers, client }) => {
                    if (observers.size === 0) {
                        if (client.isConnected()) {
                            client.disconnect();
                        }
                        this.clientObservers.delete(client.clientId);
                    }
                });
                this.clientObservers = new Map(
                    Array.from(this.clientObservers).filter(([, { observers }]) => observers.size > 0)
                );

                this.topicObservers.forEach(observers => observers.delete(observer));

                this.topicObservers = new Map(
                    Array.from(this.topicObservers)
                        .filter(([, observers]) => observers.size > 0)
                );
            };
        }).filter(data => {
            const { extensions: { controlMsgType = undefined } = {} } = data;
            const isControlMsg = typeof controlMsgType !== 'undefined';

            return controlEvents === true || !isControlMsg;
        });
    }