in x-pack/platform/plugins/shared/logs_shared/public/containers/logs/log_stream/index.ts [54:267]
export function useLogStream({
logViewReference,
startTimestamp,
endTimestamp,
query,
center,
columns,
}: LogStreamProps) {
const [state, setState] = useSetState<LogStreamState>(INITIAL_STATE);
const [resetOnSuccess, setResetOnSuccess] = useState<boolean>(false);
// Ensure the pagination keeps working when the timerange gets extended
const prevStartTimestamp = usePrevious(startTimestamp);
const prevEndTimestamp = usePrevious(endTimestamp);
const [cachedQuery, setCachedQuery] = useState(query);
if (!isEqual(query, cachedQuery)) {
setCachedQuery(query);
}
useEffect(() => {
if (prevStartTimestamp && prevStartTimestamp > startTimestamp) {
setState({ hasMoreBefore: true });
}
}, [prevStartTimestamp, startTimestamp, setState]);
useEffect(() => {
if (prevEndTimestamp && prevEndTimestamp < endTimestamp) {
setState({ hasMoreAfter: true });
}
}, [prevEndTimestamp, endTimestamp, setState]);
const commonFetchArguments = useMemo(
() => ({
logViewReference,
startTimestamp,
endTimestamp,
query: cachedQuery,
columnOverrides: columns,
}),
[columns, endTimestamp, cachedQuery, logViewReference, startTimestamp]
);
const {
fetchLogEntriesAround,
isRequestRunning: isLogEntriesAroundRequestRunning,
logEntriesAroundSearchResponses$,
} = useFetchLogEntriesAround(commonFetchArguments);
useSubscription(logEntriesAroundSearchResponses$, {
next: ({ before, after, combined }) => {
if ((before.response.data != null || after?.response.data != null) && !combined.isPartial) {
setState((_prevState) => {
const prevState = resetOnSuccess ? INITIAL_STATE : _prevState;
return {
...(resetOnSuccess ? INITIAL_STATE : prevState),
entries: combined.entries,
hasMoreAfter: combined.hasMoreAfter ?? prevState.hasMoreAfter,
hasMoreBefore: combined.hasMoreBefore ?? prevState.hasMoreBefore,
bottomCursor: combined.bottomCursor,
topCursor: combined.topCursor,
lastLoadedTime: new Date(),
};
});
if (resetOnSuccess) {
setResetOnSuccess(false);
}
}
},
});
const {
fetchLogEntriesBefore,
isRequestRunning: isLogEntriesBeforeRequestRunning,
logEntriesBeforeSearchResponse$,
} = useFetchLogEntriesBefore(commonFetchArguments);
useSubscription(logEntriesBeforeSearchResponse$, {
next: ({ response: { data, isPartial } }) => {
if (data != null && !isPartial) {
setState((_prevState) => {
const prevState = resetOnSuccess ? INITIAL_STATE : _prevState;
return {
...(resetOnSuccess ? INITIAL_STATE : prevState),
entries: [...data.entries, ...prevState.entries],
hasMoreBefore: data.hasMoreBefore ?? prevState.hasMoreBefore,
topCursor: data.topCursor ?? prevState.topCursor,
bottomCursor: prevState.bottomCursor ?? data.bottomCursor,
lastLoadedTime: new Date(),
};
});
if (resetOnSuccess) {
setResetOnSuccess(false);
}
}
},
});
const fetchPreviousEntries = useCallback<FetchPageCallback>(
(params) => {
if (state.topCursor === null && state.hasMoreBefore) {
throw new Error(
'useLogStream: Cannot fetch previous entries.\nIt seems there are more entries available, but no cursor is set.\nEnsure you have called `fetchEntries` at least once.'
);
}
if (!state.hasMoreBefore && !params?.force) {
return;
}
if (state.topCursor !== null) {
fetchLogEntriesBefore(state.topCursor, {
size: LOG_ENTRIES_CHUNK_SIZE,
extendTo: params?.extendTo,
});
}
},
[fetchLogEntriesBefore, state.topCursor, state.hasMoreBefore]
);
const {
fetchLogEntriesAfter,
isRequestRunning: isLogEntriesAfterRequestRunning,
logEntriesAfterSearchResponse$,
} = useFetchLogEntriesAfter(commonFetchArguments);
useSubscription(logEntriesAfterSearchResponse$, {
next: ({ response: { data, isPartial } }) => {
if (data != null && !isPartial) {
setState((_prevState) => {
const prevState = resetOnSuccess ? INITIAL_STATE : _prevState;
return {
...(resetOnSuccess ? INITIAL_STATE : prevState),
entries: [...prevState.entries, ...data.entries],
hasMoreAfter: data.hasMoreAfter ?? prevState.hasMoreAfter,
topCursor: prevState.topCursor ?? data.topCursor,
bottomCursor: data.bottomCursor ?? prevState.bottomCursor,
lastLoadedTime: new Date(),
};
});
if (resetOnSuccess) {
setResetOnSuccess(false);
}
}
},
});
const fetchNextEntries = useCallback<FetchPageCallback>(
(params) => {
if (state.bottomCursor === null && state.hasMoreAfter) {
throw new Error(
'useLogStream: Cannot fetch next entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.'
);
}
if (!state.hasMoreAfter && !params?.force) {
return;
}
if (state.bottomCursor !== null) {
fetchLogEntriesAfter(state.bottomCursor, {
size: LOG_ENTRIES_CHUNK_SIZE,
extendTo: params?.extendTo,
});
}
},
[fetchLogEntriesAfter, state.bottomCursor, state.hasMoreAfter]
);
const fetchEntries = useCallback(() => {
setState(INITIAL_STATE);
if (center) {
fetchLogEntriesAround(center, LOG_ENTRIES_CHUNK_SIZE);
} else {
fetchLogEntriesBefore('last', { size: LOG_ENTRIES_CHUNK_SIZE });
}
}, [center, fetchLogEntriesAround, fetchLogEntriesBefore, setState]);
// Specialized version of `fetchEntries` for streaming.
// - Reset the entries _after_ the network request succeeds.
// - Ignores `center`.
const fetchNewestEntries = useCallback(() => {
setResetOnSuccess(true);
fetchLogEntriesBefore('last', { size: LOG_ENTRIES_CHUNK_SIZE });
}, [fetchLogEntriesBefore]);
const isReloading = useMemo(
() =>
isLogEntriesAroundRequestRunning ||
(state.bottomCursor == null && state.topCursor == null && isLogEntriesBeforeRequestRunning),
[
isLogEntriesAroundRequestRunning,
isLogEntriesBeforeRequestRunning,
state.bottomCursor,
state.topCursor,
]
);
const isLoadingMore = useMemo(
() => isLogEntriesBeforeRequestRunning || isLogEntriesAfterRequestRunning,
[isLogEntriesAfterRequestRunning, isLogEntriesBeforeRequestRunning]
);
return {
...state,
fetchEntries,
fetchNextEntries,
fetchPreviousEntries,
fetchNewestEntries,
isLoadingMore,
isReloading,
};
}