in packages/aws-appsync-subscription-link/src/subscription-handshake-link.ts [53:133]
request(operation: Operation) {
const {
[this.subsInfoContextKey]: subsInfo,
controlMessages: { [CONTROL_EVENTS_KEY]: controlEvents } = { [CONTROL_EVENTS_KEY]: undefined }
} = operation.getContext();
const {
extensions: {
subscription: { newSubscriptions, mqttConnections }
} = { subscription: { newSubscriptions: {}, mqttConnections: [] } },
errors = [],
}: {
extensions?: {
subscription: SubscriptionExtension
},
errors: any[]
} = subsInfo;
if (errors && errors.length) {
return new Observable(observer => {
observer.error(new ApolloError({
errorMessage: 'Error during subscription handshake',
extraInfo: { errors },
graphQLErrors: errors
}));
return () => { };
});
}
const newSubscriptionTopics = Object.keys(newSubscriptions).map(subKey => newSubscriptions[subKey].topic);
const existingTopicsWithObserver = new Set(newSubscriptionTopics.filter(t => this.topicObservers.has(t)));
const newTopics = new Set(newSubscriptionTopics.filter(t => !existingTopicsWithObserver.has(t)));
return new Observable<FetchResult>(observer => {
existingTopicsWithObserver.forEach(t => {
this.topicObservers.get(t).add(observer);
const anObserver = Array.from(this.topicObservers.get(t)).find(() => true);
const [clientId] = Array.from(this.clientObservers).find(([, { observers }]) => observers.has(anObserver));
this.clientObservers.get(clientId).observers.add(observer);
});
const newTopicsConnectionInfo = mqttConnections
.filter(c => c.topics.some(t => newTopics.has(t)))
.map(({ topics, ...rest }) => ({
...rest,
topics: topics.filter(t => newTopics.has(t))
} as MqttConnectionInfo));
this.connectNewClients(newTopicsConnectionInfo, observer, operation);
return () => {
const clientsForCurrentObserver = Array.from(this.clientObservers).filter(([, { observers }]) => observers.has(observer));
clientsForCurrentObserver.forEach(([clientId]) => this.clientObservers.get(clientId).observers.delete(observer));
this.clientObservers.forEach(({ observers, client }) => {
if (observers.size === 0) {
if (client.isConnected()) {
client.disconnect();
}
this.clientObservers.delete(client.clientId);
}
});
this.clientObservers = new Map(
Array.from(this.clientObservers).filter(([, { observers }]) => observers.size > 0)
);
this.topicObservers.forEach(observers => observers.delete(observer));
this.topicObservers = new Map(
Array.from(this.topicObservers)
.filter(([, observers]) => observers.size > 0)
);
};
}).filter(data => {
const { extensions: { controlMsgType = undefined } = {} } = data;
const isControlMsg = typeof controlMsgType !== 'undefined';
return controlEvents === true || !isControlMsg;
});
}