export default function useRealtimeQuery()

in projects/deliberation_at_scale/packages/frontend/hooks/useRealtimeQuery.ts [51:373]


export default function useRealtimeQuery<DataType>(queryResult: QueryResult<DataType, any>, options?: UseNestedLiveQueryOptions): QueryResult<DataType> {
    const {
        channelPrefix = 'realtime',
        schemaName = 'public',
        tableEventsLookup = defaultTableEventsLookup,
        maxNestedDepth = 9999,
        autoRefetch = ENABLE_AUTO_QUERY_REFETCH,
        autoRefetchIntervalMs = AUTO_QUERY_REFETCH_INTERVAL_MS,
        enableSubscriptions = true,
    } = options ?? {};
    const {
        data, loading, refetch,
        client: apolloClient,
        observable: { query, variables: queryVariables },
    } = queryResult;
    const { cache } = apolloClient;
    const [trackedSubscription, setTrackedSubscription] = useState<RealtimeChannel | null>(null);
    const autoFetchId = useMemo(() => {
        return objectHash({
            query: JSON.stringify(query.definitions),
            variables: queryVariables,
        });
    }, [query, queryVariables]);
    const lastAutoFetchedAt = useAppSelector((state) => state.fetches.autoFetchedAtLookup[autoFetchId]);
    const trackedSubscriptionState = trackedSubscription?.state;
    const dispatch = useDispatch();

    // handle auto refetching
    useEffect(() => {
        if (!autoRefetch || !autoRefetchIntervalMs) {
            return;
        }

        const refetchInterval = setInterval(() => {
            const shouldRefetch = !lastAutoFetchedAt || Math.abs(dayjs().diff(lastAutoFetchedAt, 'millisecond')) > autoRefetchIntervalMs;

            if (!shouldRefetch || loading) {

                return;
            }

            dispatch(updateAutoFetchedAt({
                autoFetchId,
            }));
            refetch(queryVariables);
        }, autoRefetchIntervalMs);

        return () => {
            clearInterval(refetchInterval);
        };
    }, [autoFetchId, autoRefetch, autoRefetchIntervalMs, dispatch, lastAutoFetchedAt, loading, queryVariables, refetch]);

    // handle disconnects
    useEffect(() => {
        if (trackedSubscriptionState !== 'errored') {
            return;
        }

        console.error('Subscription has errored, refetching and re-subscribing...');
        console.error('This was or the following query:', query);
        refetch();
    }, [refetch, trackedSubscriptionState, query]);

    useEffect(() => {

        // guard: skip subscriptions when loading
        if (loading) {
            return;
        }

        const candidateRowIdsLookup: Record<string, string[]> = {};
        const extractCandidateNodes = (node: any, depth: number) => {
            const typeName: string = node?.__typename;
            const id = node?.id;
            const isTrackableConnection = !!typeName && typeName.endsWith('Connection');
            const isTrackableNode = !!typeName && !!id;

            // guard: skip when depth is too high
            if (depth > maxNestedDepth) {
                return;
            }

            // only if node is trackable by ID and typename add to the pool
            if (isTrackableNode) {
                candidateRowIdsLookup[typeName] = [
                    ...(candidateRowIdsLookup[typeName] ?? []),
                    id,
                ];
            }

            // only if this might be a connection which is potentially empty without any records
            // we still want to track the connection when a first entry is added
            if (isTrackableConnection) {
                const tableTypeName = typeName.replaceAll('Connection', '');
                candidateRowIdsLookup[tableTypeName] = [];
            }

            // attempt to track its properties / entries
            if (Array.isArray(node)) {
                node.map((nodeEntry) => {
                    extractCandidateNodes(nodeEntry, depth + 1);
                });
            } else if (!!node && typeof node == 'object') {
                Object.values(node).map((nodeValue) => {
                    extractCandidateNodes(nodeValue, depth + 1);
                });
            }
        };
        // eslint-disable-next-line no-prototype-builtins
        const shouldHandleAllTables = tableEventsLookup.hasOwnProperty(allTablesWildcard);
        const getTableEvents = (tableName: string) => {
            return tableEventsLookup?.[tableName] ?? defaultTableEventsLookup?.[allTablesWildcard];
        };
        const shouldHandleOperation = (operationsKey: 'listenOperations' | 'refetchOperations', defaultOperations: TableOperation[], operation: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT, tableName: string) => {
            const tableEvents = getTableEvents(tableName);
            // eslint-disable-next-line no-prototype-builtins
            const shouldHandleTable = tableEventsLookup.hasOwnProperty(tableName) || shouldHandleAllTables;
            const operations = tableEvents?.[operationsKey] ?? defaultOperations;
            const shouldHandleOperation = operations.includes(operation) || operations.includes('*');

            return enableSubscriptions && shouldHandleTable && shouldHandleOperation;
        };
        const shouldListenToOperation = (operation: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT, tableName: string) => {
            return shouldHandleOperation('listenOperations', defaultListenOperations, operation, tableName);
        };
        const shouldRefetchOnOperation = (operation: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT, tableName: string) => {
            return shouldHandleOperation('refetchOperations', defaultRefetchOperations, operation, tableName);
        };

        // extract all the candidate nodes grouped by table name (aka typename)
        // this creates a lookup table to be handled for all the subscriptions looking like:
        // { messages: ['id1', 'id2', 'id3', 'id4'], topics: ['id1', 'id2'] }
        extractCandidateNodes(data, 0);

        const channelName = `${channelPrefix}_${objectHash({ options, candidateRowIdsLookup })}`;
        const subscription = supabaseClient.channel(channelName);
        const tableNames = Object.keys(candidateRowIdsLookup);
        let isTrackingSomething = false;

        tableNames.map((tableName) => {
            const tableEvents = getTableEvents(tableName);
            const rowIds = candidateRowIdsLookup[tableName];
            const joinedRowIds = rowIds.join(',');
            const nodeIdFilter = `id=in.(${joinedRowIds})`;

            // check which operations we should listen to
            const shouldListenToAllOperations = shouldListenToOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL, tableName);
            const shouldListenToInsert = shouldListenToOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT, tableName) || shouldListenToAllOperations;
            const shouldListenToUpdate = shouldListenToOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE, tableName) || shouldListenToAllOperations;
            const shouldListenToDelete = shouldListenToOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE, tableName) || shouldListenToAllOperations;
            const shouldListenToNone = !shouldListenToInsert && !shouldListenToUpdate && !shouldListenToDelete;

            // check on which operations the whole query needs to be rerun
            const shouldRefetchOnAllOperations = shouldRefetchOnOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL, tableName);
            const shouldRefetchOnInsert = shouldRefetchOnOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT, tableName) || shouldRefetchOnAllOperations;
            const shouldRefetchOnUpdate = shouldRefetchOnOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE, tableName) || shouldRefetchOnAllOperations;
            const shouldRefectchOnDelete = shouldRefetchOnOperation(REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE, tableName) || shouldRefetchOnAllOperations;

            // delete handlers
            const shouldEvictOnDelete = tableEvents?.evictOnDelete ?? defaultEvictOnDelete;

            // insert handlers
            const appendOnInsertEdgePaths = tableEvents?.appendOnInsertEdgePaths ?? defaultAppendOnInsertEdgePaths;
            const shouldInsertForEdgePaths = !isEmpty(appendOnInsertEdgePaths);

            // any filters on the operation listeners that might be overriden or can be the default
            const listenFilters = tableEvents?.listenFilters;
            const insertFilter = listenFilters?.INSERT ?? undefined;
            const updateFilter = listenFilters?.UPDATE ?? nodeIdFilter;
            const deleteFilter = listenFilters?.DELETE ?? nodeIdFilter;

            const getNodeId = (rowId: string) => {
                return `${tableName}:${rowId}`;
            };

            // guard: skip this entry when none should be tracked
            if (shouldListenToNone || !ENABLE_REALTIME_SUBSCRIPTIONS) {
                return;
            }

            // if not set this skips the subscription
            isTrackingSomething = true;

            if (shouldListenToInsert) {
                subscription.on(
                    "postgres_changes",
                    {
                        event: "INSERT",
                        schema: schemaName,
                        table: tableName,
                        filter: insertFilter,
                    },
                    (payload) => {
                        const newRow = payload.new;
                        // console.log('INSERT', tableName, newRow);
                        if (shouldRefetchOnInsert) {
                            refetch();
                        }

                        if (shouldInsertForEdgePaths) {
                            appendOnInsertEdgePaths.map((path) => {
                                const edgePath = `${path}.edges`;
                                const currentEdges = get(data, edgePath);
                                const newNode = {
                                    __typename: tableName,
                                    ...newRow,
                                };
                                const newEdge = {
                                    __typename: `${tableName}Edge`,
                                    node: newNode
                                };

                                // guard: ensure this is an array
                                if (!Array.isArray(currentEdges)) {
                                    return;
                                }

                                const cacheMutationData = set({ __typename:  `${tableName}Connection` }, edgePath, [
                                    ...currentEdges,
                                    newEdge,
                                ]);

                                cache.writeQuery({
                                    query,
                                    variables: queryVariables,
                                    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
                                    // @ts-ignore
                                    data: cacheMutationData,
                                });
                            });
                        }
                    }
                );
            }

            if (shouldListenToUpdate) {
                subscription.on(
                    "postgres_changes",
                    {
                        event: "UPDATE",
                        schema: schemaName,
                        table: tableName,
                        filter: updateFilter,
                    },
                    (payload) => {
                        const newRow = payload.new;
                        const fragment = `
                            fragment UpdatedRow on ${tableName} {
                                ${Object.keys(newRow).join(", ")}
                            }
                        `;
                        // console.log('UPDATE', tableName, newRow);

                        cache.writeFragment({
                            id: getNodeId(newRow?.id),
                            data: newRow,
                            fragment: gql(fragment),
                        });
                        if (shouldRefetchOnUpdate) {
                            refetch();
                        }
                    }
                );
            }

            if (shouldListenToDelete) {
                subscription.on(
                    "postgres_changes",
                    {
                        event: "DELETE",
                        schema: schemaName,
                        table: tableName,
                        filter: deleteFilter,
                    },
                    (payload) => {
                        const { id: deletedId } = payload.old;

                        if (shouldRefectchOnDelete) {
                            refetch();
                        }

                        if (shouldEvictOnDelete && deletedId) {
                            const normalizedId = cache.identify({
                                id: deletedId,
                                __typename: tableName,
                            });

                            cache.evict({ id: normalizedId });
                        }
                    }
                );
            }
        });

        // guard: only subscribe when anything is tracked
        if (!isTrackingSomething) {
            return;
        }

        // subscribe to all the requested tables and nodes
        subscription.subscribe((status) => {

            if (status === 'SUBSCRIBED') {
                console.log(`Subscription successfully subscribed with status: ${status}`);
                setTrackedSubscription(() => {
                    return subscription;
                });
            } else if (status === 'CHANNEL_ERROR' || status === 'TIMED_OUT') {
                console.error(`Subscription failed to subscribe and gave status: ${status}`);
            }
        });

        return () => {
            setTrackedSubscription(() => {
                return null;
            });
            subscription.unsubscribe();
        };
    // eslint-disable-next-line react-hooks/exhaustive-deps
    }, [channelPrefix, JSON.stringify(data), loading, maxNestedDepth, JSON.stringify(options), queryVariables, refetch, schemaName, setTrackedSubscription, JSON.stringify(tableEventsLookup)]);

    return queryResult;
}