Skip to content

Commit

Permalink
Fixes log duplication issue (sematic-ai#722)
Browse files Browse the repository at this point in the history
A bug occurs when logs are already loading, and an accidental
simultaneous scroll event triggers another log pull. The second pull
will append the same logs twice.

According to the previous work, we can already sequentialize the pulls
using a queue. So the fix is to avoid the second pull being issued if we
discovered from the first pull that there would be no more logs.

This PR does such, and some extras:

1. Add more loggings to understand the async queuing and execution
details
2. A helper function `useRefFn` to avoid the initial value of `useRef`
being re-created multiple times across rendering. (see also
[here](facebook/react#14490))
3. Convert `AsyncInvocationQueue` from a function to a class to carry
more responsibilities
4. Updated unit test accordingly.

---------

Co-authored-by: tscurtu <[email protected]>
  • Loading branch information
chance-an and tscurtu authored Apr 7, 2023
1 parent feb95ff commit 2d7ea34
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 27 deletions.
18 changes: 18 additions & 0 deletions sematic/ui/packages/common/src/utils/hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { useRef } from 'react';

export function useRefFn<T>(initializer: () => T): T {
const instanceRef = useRef<T | null>(null)

function getInstance() {
let instance = instanceRef.current;
if (instance !== null) {
return instance;
}
// Lazy init
let newInstance = initializer();
instanceRef.current = newInstance;
return newInstance;
}

return getInstance();
}
72 changes: 58 additions & 14 deletions sematic/ui/packages/main/src/hooks/logHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import useLatest from "react-use/lib/useLatest";
import { LogLineRequestResponse } from "../Payloads";
import { AsyncInvocationQueue, useLogger } from "src/utils";
import { useHttpClient } from "./httpHooks";
import { useRefFn } from "@sematic/common/src/utils/hooks";

const MAX_LINES = 2000;
const POLLING_INTERVAL = 5000;
export interface GetNextResult {
pulledLines: number
pulledLines?: number;
canceled?: boolean;
}

export enum DiagnosticReasons {
Expand Down Expand Up @@ -79,18 +81,56 @@ export function useLogStream(source: string, filterString: string) {
}
}, [source, setHasPulledData, hasMore, filterString, cursor, MAX_LINES, devLogger]);

const acquire = useRef(AsyncInvocationQueue());
const asyncInvocationManager = useRefFn(() => new AsyncInvocationQueue());

const abortController = useRefFn(() => new AbortController());

const getNextLatest = useLatest(getNext);
const hasMoreLatest = useLatest(hasMore);

// This is a function to call getNext by queuing the request.
// This is to avoid multiple getNext() calls being made at the same time.
const getNextWithQueue = useCallback(async (reason: DiagnosticReasons) => {
const release = await acquire.current();
const result = await getNextLatest.current(reason);
const ID = asyncInvocationManager.InstanceID;
devLogger(`[AsyncQ_${ID}][${reason}] Entering asyncQueue, attempting to acquire lock`);

const release = await asyncInvocationManager.acquire();
devLogger(`[AsyncQ_${ID}][${reason}] acquired.`);

if (hasMoreLatest.current === false) {
// There is no more data to fetch. No need to continue.
devLogger(`[AsyncQ_${ID}][${reason}] Since 'hasMore' is false, getNext queue will not continue.`);

return {
canceled: true,
};
}

if (abortController.signal.aborted) {
// The rendering thread is going away. No need to continue.
devLogger(`[AsyncQ_${ID}][${reason}] Canceled.`);

return {
canceled: true,
};
}
const result = await (new Promise<GetNextResult>((resolve) => {
setTimeout(async () => {
resolve(await getNextLatest.current(reason));
}, 0);
}));
release();
devLogger(`[AsyncQ_${ID}][${reason}] released.`);
return result;
},[acquire, getNextLatest]);
},[asyncInvocationManager, getNextLatest, hasMoreLatest, abortController, devLogger]);

useEffect(() => {
return () => {
if (!abortController.signal.aborted) {
abortController.abort();
}
};
},[abortController])

return { lines, isLoading, error, hasMore, logInfoMessage, getNext: getNextWithQueue, hasPulledData };
}
Expand Down Expand Up @@ -129,27 +169,31 @@ export function useAccumulateLogsUntilEnd(hasMore: boolean,
}

setIsLoading(true);
const {pulledLines} = await latestGetNext.current(DiagnosticReasons.ACCUMULATE);
const { pulledLines, canceled } = await latestGetNext.current(DiagnosticReasons.ACCUMULATE);

if (abortController.signal.aborted) {
break;
}
setIsLoading(false);

// The server shouldn't return NaN, but just in case and be defensive,
// we don't want to add NaN to the accumulatedLines.
if (!isNaN(pulledLines)) {
accumulatedLines += pulledLines;
setAccumulatedLines(accumulatedLines);
}

if (canceled !== true) {
// The server shouldn't return NaN, but just in case and be defensive,
// we don't want to add NaN to the accumulatedLines.
if (!isNaN(pulledLines!)) {
accumulatedLines += pulledLines!;
setAccumulatedLines(accumulatedLines);
} else {
devLogger('Encounter NaN for pulledLines, skip updating component state.')
}
}

// Yield to rendering cycles
await new Promise(
resolve => setTimeout(resolve, POLLING_INTERVAL)
);
}
setIsAccumulating(false);
}, [latestHasMore, latestGetNext]);
}, [latestHasMore, devLogger, latestGetNext]);

useEffect(() => {
// always cancel ongoing accumulation if the component will unmount
Expand Down
9 changes: 5 additions & 4 deletions sematic/ui/packages/main/src/tests/test_utils.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ describe('abbreviatedUserName()', () => {
describe('AsyncInvocationQueue', () => {
it("should sequentialize function invocation using a queue", () => {
let asyncFunction: any;
let acquire: any;
let asyncController: AsyncInvocationQueue;
cy.clock();
cy.wait(0).then(async () => {
acquire = AsyncInvocationQueue();
asyncController = new AsyncInvocationQueue();
asyncFunction = cy.spy(async () => {
await new Promise(resolve => setTimeout(resolve, 100));
});
});
cy.wait(0).then(async () => {
setTimeout(async () => {
const release = await acquire();
const release = await asyncController.acquire();
await asyncFunction();
release();
}, 0);

setTimeout(async () => {
const release = await acquire();
const release = await asyncController.acquire();
await asyncFunction();
release();
}, 20);
Expand All @@ -50,6 +50,7 @@ describe('AsyncInvocationQueue', () => {
// test that at 50th millisecond, the function is executed only once
// because the second call is queued
expect(asyncFunction).to.have.callCount(1);
expect(asyncController.IsBusy).to.equal(true);
});
cy.wait(0);

Expand Down
35 changes: 26 additions & 9 deletions sematic/ui/packages/main/src/utils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,46 @@ export function durationSecondsToString(durationS: number) : string {
return final;
}

let ID = 0;

export function AsyncInvocationQueue() {
const queue: any[] = [];
export interface ReleaseHandle {
(): void;
}
export class AsyncInvocationQueue {
private queue: any[] = [];
private instanceID: number;

constructor() {
this.queue = [];
this.instanceID = ID++;
}

const acquire = async () => {
async acquire(): Promise<ReleaseHandle> {
let resolve: any;
const waitingPromise = new Promise((_resolve) => {
resolve = _resolve;
});
queue.push(waitingPromise);
this.queue.push(waitingPromise);

// Wait until the all the promises before this one have been resolved
while (queue.length !== 0) {
if (queue[0] === waitingPromise) {
// Wait until all the promises before this one have been resolved
while (this.queue.length !== 0) {
if (this.queue[0] === waitingPromise) {
break;
}
await queue.shift();
await this.queue.shift();
// sleep
await new Promise((resolve) => setTimeout(resolve, 50));
}

// The resolve function can be used to release to the next item in the queue
return resolve;
}
return acquire;

get InstanceID() {
return this.instanceID;
}

get IsBusy() {
return this.queue.length > 0;
}
}

0 comments on commit 2d7ea34

Please sign in to comment.