export function useLogStream()

in x-pack/platform/plugins/shared/logs_shared/public/containers/logs/log_stream/index.ts [54:267]


export function useLogStream({
  logViewReference,
  startTimestamp,
  endTimestamp,
  query,
  center,
  columns,
}: LogStreamProps) {
  const [state, setState] = useSetState<LogStreamState>(INITIAL_STATE);
  const [resetOnSuccess, setResetOnSuccess] = useState<boolean>(false);

  // Ensure the pagination keeps working when the timerange gets extended
  const prevStartTimestamp = usePrevious(startTimestamp);
  const prevEndTimestamp = usePrevious(endTimestamp);

  const [cachedQuery, setCachedQuery] = useState(query);
  if (!isEqual(query, cachedQuery)) {
    setCachedQuery(query);
  }

  useEffect(() => {
    if (prevStartTimestamp && prevStartTimestamp > startTimestamp) {
      setState({ hasMoreBefore: true });
    }
  }, [prevStartTimestamp, startTimestamp, setState]);

  useEffect(() => {
    if (prevEndTimestamp && prevEndTimestamp < endTimestamp) {
      setState({ hasMoreAfter: true });
    }
  }, [prevEndTimestamp, endTimestamp, setState]);

  const commonFetchArguments = useMemo(
    () => ({
      logViewReference,
      startTimestamp,
      endTimestamp,
      query: cachedQuery,
      columnOverrides: columns,
    }),
    [columns, endTimestamp, cachedQuery, logViewReference, startTimestamp]
  );

  const {
    fetchLogEntriesAround,
    isRequestRunning: isLogEntriesAroundRequestRunning,
    logEntriesAroundSearchResponses$,
  } = useFetchLogEntriesAround(commonFetchArguments);

  useSubscription(logEntriesAroundSearchResponses$, {
    next: ({ before, after, combined }) => {
      if ((before.response.data != null || after?.response.data != null) && !combined.isPartial) {
        setState((_prevState) => {
          const prevState = resetOnSuccess ? INITIAL_STATE : _prevState;
          return {
            ...(resetOnSuccess ? INITIAL_STATE : prevState),
            entries: combined.entries,
            hasMoreAfter: combined.hasMoreAfter ?? prevState.hasMoreAfter,
            hasMoreBefore: combined.hasMoreBefore ?? prevState.hasMoreBefore,
            bottomCursor: combined.bottomCursor,
            topCursor: combined.topCursor,
            lastLoadedTime: new Date(),
          };
        });
        if (resetOnSuccess) {
          setResetOnSuccess(false);
        }
      }
    },
  });

  const {
    fetchLogEntriesBefore,
    isRequestRunning: isLogEntriesBeforeRequestRunning,
    logEntriesBeforeSearchResponse$,
  } = useFetchLogEntriesBefore(commonFetchArguments);

  useSubscription(logEntriesBeforeSearchResponse$, {
    next: ({ response: { data, isPartial } }) => {
      if (data != null && !isPartial) {
        setState((_prevState) => {
          const prevState = resetOnSuccess ? INITIAL_STATE : _prevState;
          return {
            ...(resetOnSuccess ? INITIAL_STATE : prevState),
            entries: [...data.entries, ...prevState.entries],
            hasMoreBefore: data.hasMoreBefore ?? prevState.hasMoreBefore,
            topCursor: data.topCursor ?? prevState.topCursor,
            bottomCursor: prevState.bottomCursor ?? data.bottomCursor,
            lastLoadedTime: new Date(),
          };
        });
        if (resetOnSuccess) {
          setResetOnSuccess(false);
        }
      }
    },
  });

  const fetchPreviousEntries = useCallback<FetchPageCallback>(
    (params) => {
      if (state.topCursor === null && state.hasMoreBefore) {
        throw new Error(
          'useLogStream: Cannot fetch previous entries.\nIt seems there are more entries available, but no cursor is set.\nEnsure you have called `fetchEntries` at least once.'
        );
      }

      if (!state.hasMoreBefore && !params?.force) {
        return;
      }

      if (state.topCursor !== null) {
        fetchLogEntriesBefore(state.topCursor, {
          size: LOG_ENTRIES_CHUNK_SIZE,
          extendTo: params?.extendTo,
        });
      }
    },
    [fetchLogEntriesBefore, state.topCursor, state.hasMoreBefore]
  );

  const {
    fetchLogEntriesAfter,
    isRequestRunning: isLogEntriesAfterRequestRunning,
    logEntriesAfterSearchResponse$,
  } = useFetchLogEntriesAfter(commonFetchArguments);

  useSubscription(logEntriesAfterSearchResponse$, {
    next: ({ response: { data, isPartial } }) => {
      if (data != null && !isPartial) {
        setState((_prevState) => {
          const prevState = resetOnSuccess ? INITIAL_STATE : _prevState;
          return {
            ...(resetOnSuccess ? INITIAL_STATE : prevState),
            entries: [...prevState.entries, ...data.entries],
            hasMoreAfter: data.hasMoreAfter ?? prevState.hasMoreAfter,
            topCursor: prevState.topCursor ?? data.topCursor,
            bottomCursor: data.bottomCursor ?? prevState.bottomCursor,
            lastLoadedTime: new Date(),
          };
        });
        if (resetOnSuccess) {
          setResetOnSuccess(false);
        }
      }
    },
  });

  const fetchNextEntries = useCallback<FetchPageCallback>(
    (params) => {
      if (state.bottomCursor === null && state.hasMoreAfter) {
        throw new Error(
          'useLogStream: Cannot fetch next entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.'
        );
      }

      if (!state.hasMoreAfter && !params?.force) {
        return;
      }

      if (state.bottomCursor !== null) {
        fetchLogEntriesAfter(state.bottomCursor, {
          size: LOG_ENTRIES_CHUNK_SIZE,
          extendTo: params?.extendTo,
        });
      }
    },
    [fetchLogEntriesAfter, state.bottomCursor, state.hasMoreAfter]
  );

  const fetchEntries = useCallback(() => {
    setState(INITIAL_STATE);

    if (center) {
      fetchLogEntriesAround(center, LOG_ENTRIES_CHUNK_SIZE);
    } else {
      fetchLogEntriesBefore('last', { size: LOG_ENTRIES_CHUNK_SIZE });
    }
  }, [center, fetchLogEntriesAround, fetchLogEntriesBefore, setState]);

  // Specialized version of `fetchEntries` for streaming.
  // - Reset the entries _after_ the network request succeeds.
  // - Ignores `center`.
  const fetchNewestEntries = useCallback(() => {
    setResetOnSuccess(true);
    fetchLogEntriesBefore('last', { size: LOG_ENTRIES_CHUNK_SIZE });
  }, [fetchLogEntriesBefore]);

  const isReloading = useMemo(
    () =>
      isLogEntriesAroundRequestRunning ||
      (state.bottomCursor == null && state.topCursor == null && isLogEntriesBeforeRequestRunning),
    [
      isLogEntriesAroundRequestRunning,
      isLogEntriesBeforeRequestRunning,
      state.bottomCursor,
      state.topCursor,
    ]
  );

  const isLoadingMore = useMemo(
    () => isLogEntriesBeforeRequestRunning || isLogEntriesAfterRequestRunning,
    [isLogEntriesAfterRequestRunning, isLogEntriesBeforeRequestRunning]
  );

  return {
    ...state,
    fetchEntries,
    fetchNextEntries,
    fetchPreviousEntries,
    fetchNewestEntries,
    isLoadingMore,
    isReloading,
  };
}