From 2d7ea34fb2a5dde92f0c8860dbbf0baa9efacfa0 Mon Sep 17 00:00:00 2001 From: Chance An Date: Fri, 7 Apr 2023 12:44:44 -0700 Subject: [PATCH] Fixes log duplication issue (#722) A bug occurs when logs are already loading, and an accidental simultaneous scroll event triggers another log pull. The second pull will append the same logs twice. According to the previous work, we can already sequentialize the pulls using a queue. So the fix is to avoid the second pull being issued if we discovered from the first pull that there would be no more logs. This PR does such, and some extras: 1. Add more loggings to understand the async queuing and execution details 2. A helper function `useRefFn` to avoid the initial value of `useRef` being re-created multiple times across rendering. (see also [here](https://github.com/facebook/react/issues/14490)) 3. Convert `AsyncInvocationQueue` from a function to a class to carry more responsibilities 4. Updated unit test accordingly. --------- Co-authored-by: tscurtu --- sematic/ui/packages/common/src/utils/hooks.ts | 18 +++++ .../ui/packages/main/src/hooks/logHooks.ts | 72 +++++++++++++++---- .../packages/main/src/tests/test_utils.cy.ts | 9 +-- sematic/ui/packages/main/src/utils.tsx | 35 ++++++--- 4 files changed, 107 insertions(+), 27 deletions(-) create mode 100644 sematic/ui/packages/common/src/utils/hooks.ts diff --git a/sematic/ui/packages/common/src/utils/hooks.ts b/sematic/ui/packages/common/src/utils/hooks.ts new file mode 100644 index 000000000..e71ed18e4 --- /dev/null +++ b/sematic/ui/packages/common/src/utils/hooks.ts @@ -0,0 +1,18 @@ +import { useRef } from 'react'; + +export function useRefFn(initializer: () => T): T { + const instanceRef = useRef(null) + + function getInstance() { + let instance = instanceRef.current; + if (instance !== null) { + return instance; + } + // Lazy init + let newInstance = initializer(); + instanceRef.current = newInstance; + return newInstance; + } + + return getInstance(); +} diff --git a/sematic/ui/packages/main/src/hooks/logHooks.ts b/sematic/ui/packages/main/src/hooks/logHooks.ts index 837e4970c..40c19e363 100644 --- a/sematic/ui/packages/main/src/hooks/logHooks.ts +++ b/sematic/ui/packages/main/src/hooks/logHooks.ts @@ -5,11 +5,13 @@ import useLatest from "react-use/lib/useLatest"; import { LogLineRequestResponse } from "../Payloads"; import { AsyncInvocationQueue, useLogger } from "src/utils"; import { useHttpClient } from "./httpHooks"; +import { useRefFn } from "@sematic/common/src/utils/hooks"; const MAX_LINES = 2000; const POLLING_INTERVAL = 5000; export interface GetNextResult { - pulledLines: number + pulledLines?: number; + canceled?: boolean; } export enum DiagnosticReasons { @@ -79,18 +81,56 @@ export function useLogStream(source: string, filterString: string) { } }, [source, setHasPulledData, hasMore, filterString, cursor, MAX_LINES, devLogger]); - const acquire = useRef(AsyncInvocationQueue()); + const asyncInvocationManager = useRefFn(() => new AsyncInvocationQueue()); + + const abortController = useRefFn(() => new AbortController()); const getNextLatest = useLatest(getNext); + const hasMoreLatest = useLatest(hasMore); // This is a function to call getNext by queuing the request. // This is to avoid multiple getNext() calls being made at the same time. const getNextWithQueue = useCallback(async (reason: DiagnosticReasons) => { - const release = await acquire.current(); - const result = await getNextLatest.current(reason); + const ID = asyncInvocationManager.InstanceID; + devLogger(`[AsyncQ_${ID}][${reason}] Entering asyncQueue, attempting to acquire lock`); + + const release = await asyncInvocationManager.acquire(); + devLogger(`[AsyncQ_${ID}][${reason}] acquired.`); + + if (hasMoreLatest.current === false) { + // There is no more data to fetch. No need to continue. + devLogger(`[AsyncQ_${ID}][${reason}] Since 'hasMore' is false, getNext queue will not continue.`); + + return { + canceled: true, + }; + } + + if (abortController.signal.aborted) { + // The rendering thread is going away. No need to continue. + devLogger(`[AsyncQ_${ID}][${reason}] Canceled.`); + + return { + canceled: true, + }; + } + const result = await (new Promise((resolve) => { + setTimeout(async () => { + resolve(await getNextLatest.current(reason)); + }, 0); + })); release(); + devLogger(`[AsyncQ_${ID}][${reason}] released.`); return result; - },[acquire, getNextLatest]); + },[asyncInvocationManager, getNextLatest, hasMoreLatest, abortController, devLogger]); + + useEffect(() => { + return () => { + if (!abortController.signal.aborted) { + abortController.abort(); + } + }; + },[abortController]) return { lines, isLoading, error, hasMore, logInfoMessage, getNext: getNextWithQueue, hasPulledData }; } @@ -129,27 +169,31 @@ export function useAccumulateLogsUntilEnd(hasMore: boolean, } setIsLoading(true); - const {pulledLines} = await latestGetNext.current(DiagnosticReasons.ACCUMULATE); + const { pulledLines, canceled } = await latestGetNext.current(DiagnosticReasons.ACCUMULATE); if (abortController.signal.aborted) { break; } setIsLoading(false); - - // The server shouldn't return NaN, but just in case and be defensive, - // we don't want to add NaN to the accumulatedLines. - if (!isNaN(pulledLines)) { - accumulatedLines += pulledLines; - setAccumulatedLines(accumulatedLines); - } + if (canceled !== true) { + // The server shouldn't return NaN, but just in case and be defensive, + // we don't want to add NaN to the accumulatedLines. + if (!isNaN(pulledLines!)) { + accumulatedLines += pulledLines!; + setAccumulatedLines(accumulatedLines); + } else { + devLogger('Encounter NaN for pulledLines, skip updating component state.') + } + } + // Yield to rendering cycles await new Promise( resolve => setTimeout(resolve, POLLING_INTERVAL) ); } setIsAccumulating(false); - }, [latestHasMore, latestGetNext]); + }, [latestHasMore, devLogger, latestGetNext]); useEffect(() => { // always cancel ongoing accumulation if the component will unmount diff --git a/sematic/ui/packages/main/src/tests/test_utils.cy.ts b/sematic/ui/packages/main/src/tests/test_utils.cy.ts index f06abd206..e2c56f2c8 100644 --- a/sematic/ui/packages/main/src/tests/test_utils.cy.ts +++ b/sematic/ui/packages/main/src/tests/test_utils.cy.ts @@ -20,23 +20,23 @@ describe('abbreviatedUserName()', () => { describe('AsyncInvocationQueue', () => { it("should sequentialize function invocation using a queue", () => { let asyncFunction: any; - let acquire: any; + let asyncController: AsyncInvocationQueue; cy.clock(); cy.wait(0).then(async () => { - acquire = AsyncInvocationQueue(); + asyncController = new AsyncInvocationQueue(); asyncFunction = cy.spy(async () => { await new Promise(resolve => setTimeout(resolve, 100)); }); }); cy.wait(0).then(async () => { setTimeout(async () => { - const release = await acquire(); + const release = await asyncController.acquire(); await asyncFunction(); release(); }, 0); setTimeout(async () => { - const release = await acquire(); + const release = await asyncController.acquire(); await asyncFunction(); release(); }, 20); @@ -50,6 +50,7 @@ describe('AsyncInvocationQueue', () => { // test that at 50th millisecond, the function is executed only once // because the second call is queued expect(asyncFunction).to.have.callCount(1); + expect(asyncController.IsBusy).to.equal(true); }); cy.wait(0); diff --git a/sematic/ui/packages/main/src/utils.tsx b/sematic/ui/packages/main/src/utils.tsx index 41190c4e4..14d352a34 100644 --- a/sematic/ui/packages/main/src/utils.tsx +++ b/sematic/ui/packages/main/src/utils.tsx @@ -175,23 +175,33 @@ export function durationSecondsToString(durationS: number) : string { return final; } +let ID = 0; -export function AsyncInvocationQueue() { - const queue: any[] = []; +export interface ReleaseHandle { + (): void; +} +export class AsyncInvocationQueue { + private queue: any[] = []; + private instanceID: number; + + constructor() { + this.queue = []; + this.instanceID = ID++; + } - const acquire = async () => { + async acquire(): Promise { let resolve: any; const waitingPromise = new Promise((_resolve) => { resolve = _resolve; }); - queue.push(waitingPromise); + this.queue.push(waitingPromise); - // Wait until the all the promises before this one have been resolved - while (queue.length !== 0) { - if (queue[0] === waitingPromise) { + // Wait until all the promises before this one have been resolved + while (this.queue.length !== 0) { + if (this.queue[0] === waitingPromise) { break; } - await queue.shift(); + await this.queue.shift(); // sleep await new Promise((resolve) => setTimeout(resolve, 50)); } @@ -199,5 +209,12 @@ export function AsyncInvocationQueue() { // The resolve function can be used to release to the next item in the queue return resolve; } - return acquire; + + get InstanceID() { + return this.instanceID; + } + + get IsBusy() { + return this.queue.length > 0; + } }