Skip to content

Commit

Permalink
Merge pull request #1374 from canalplus/feat/attach-worker-prom
Browse files Browse the repository at this point in the history
Make `attachWorker` return a promise
  • Loading branch information
peaBerberian authored Feb 5, 2024
2 parents 1b2bd49 + b2c8716 commit cf0cd1b
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 14 deletions.
59 changes: 51 additions & 8 deletions src/core/api/public_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
IErrorType,
MediaError,
} from "../../errors";
import WorkerInitializationError from "../../errors/worker_initialization_error";
import features, {
addFeatures,
IFeature,
Expand All @@ -54,7 +55,11 @@ import Manifest, {
ManifestMetadataFormat,
createRepresentationFilterFromFnString,
} from "../../manifest";
import { MainThreadMessageType } from "../../multithread_types";
import {
IWorkerMessage,
MainThreadMessageType,
WorkerMessageType,
} from "../../multithread_types";
import {
IAudioRepresentation,
IAudioRepresentationsSwitchingMode,
Expand Down Expand Up @@ -435,10 +440,18 @@ class Player extends EventEmitter<IPublicAPIEvent> {
*/
public attachWorker(
workerSettings : IWorkerSettings
) : void {
if (!hasWebassembly) {
log.warn("API: Cannot rely on a WebWorker: WebAssembly unavailable");
} else {
) : Promise<void> {
return new Promise((res, rej) => {
if (typeof Worker !== "function") {
log.warn("API: Cannot rely on a WebWorker: Worker API unavailable");
return rej(new WorkerInitializationError("INCOMPATIBLE_ERROR",
"Worker unavailable"));
}
if (!hasWebassembly) {
log.warn("API: Cannot rely on a WebWorker: WebAssembly unavailable");
return rej(new WorkerInitializationError("INCOMPATIBLE_ERROR",
"WebAssembly unavailable"));
}
if (typeof workerSettings.workerUrl === "string") {
this._priv_worker = new Worker(workerSettings.workerUrl);
} else {
Expand All @@ -448,10 +461,40 @@ class Player extends EventEmitter<IPublicAPIEvent> {
}

this._priv_worker.onerror = (evt: ErrorEvent) => {
this._priv_worker = null;
log.error("Unexpected worker error",
if (this._priv_worker !== null) {
this._priv_worker.terminate();
this._priv_worker = null;
}
log.error("API: Unexpected worker error",
evt.error instanceof Error ? evt.error : undefined);
rej(new WorkerInitializationError("UNKNOWN_ERROR",
"Unexpected Worker \"error\" event"));
};
const handleInitMessages = (msg: MessageEvent) => {
const msgData = msg.data as unknown as IWorkerMessage;
if (msgData.type === WorkerMessageType.InitError) {
log.warn("API: Processing InitError worker message: detaching worker");
if (this._priv_worker !== null) {
this._priv_worker.removeEventListener("message", handleInitMessages);
this._priv_worker.terminate();
this._priv_worker = null;
}
rej(
new WorkerInitializationError(
"SETUP_ERROR",
"Worker parser initialization failed: " + msgData.value.errorMessage
)
);
} else if (msgData.type === WorkerMessageType.InitSuccess) {
log.info("API: InitSuccess received from worker.");
if (this._priv_worker !== null) {
this._priv_worker.removeEventListener("message", handleInitMessages);
}
res();
}
};
this._priv_worker.addEventListener("message", handleInitMessages);

log.debug("---> Sending To Worker:", MainThreadMessageType.Init);
this._priv_worker.postMessage({
type: MainThreadMessageType.Init,
Expand All @@ -478,7 +521,7 @@ class Player extends EventEmitter<IPublicAPIEvent> {
},
});
}, this._destroyCanceller.signal);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,11 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
break;
}

case WorkerMessageType.InitSuccess:
case WorkerMessageType.InitError:
// Should already be handled by the API
break;

default:
assertUnreachable(msgData);
}
Expand Down
20 changes: 14 additions & 6 deletions src/core/init/multithread/worker/worker_portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,20 @@ export default function initializeWorkerPortal() {
const diffWorker = Date.now() - performance.now();
mainThreadTimestampDiff.setValueIfChanged(diffWorker - diffMain);
updateLoggerLevel(msg.value.logLevel, msg.value.sendBackLogs);
dashWasmParser.initialize({ wasmUrl: msg.value.dashWasmUrl }).catch((err) => {
const error = err instanceof Error ?
err.toString() :
"Unknown Error";
log.error("Worker: Could not initialize DASH_WASM parser", error);
});
dashWasmParser.initialize({ wasmUrl: msg.value.dashWasmUrl }).then(
() => {
sendMessage({ type: WorkerMessageType.InitSuccess,
value: null });
}, (err) => {
const error = err instanceof Error ?
err.toString() :
"Unknown Error";
log.error("Worker: Could not initialize DASH_WASM parser", error);
sendMessage({ type: WorkerMessageType.InitError,
value: { errorMessage: error,
kind: "dashWasmInitialization" } });

});

if (!msg.value.hasVideo || msg.value.hasMseInWorker) {
contentPreparer.disposeCurrentContent();
Expand Down
36 changes: 36 additions & 0 deletions src/errors/worker_initialization_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import errorMessage from "./error_message";

type IWorkerInitializationErrorCode = "UNKNOWN_ERROR" |
"SETUP_ERROR" |
"INCOMPATIBLE_ERROR";

/**
* Error linked to the WebWorker initialization.
*
* @class WorkerInitializationError
* @extends Error
*/
export default class WorkerInitializationError extends Error {
public readonly name : "WorkerInitializationError";
public readonly type : "WORKER_INITIALIZATION_ERROR";
public readonly message : string;
public readonly code : IWorkerInitializationErrorCode;

/**
* @param {string} code
* @param {string} message
*/
constructor(
code : IWorkerInitializationErrorCode,
message : string
) {
super();
// @see https://stackoverflow.com/questions/41102060/typescript-extending-error-class
Object.setPrototypeOf(this, WorkerInitializationError.prototype);

this.name = "WorkerInitializationError";
this.type = "WORKER_INITIALIZATION_ERROR";
this.code = code;
this.message = errorMessage(this.code, message);
}
}
42 changes: 42 additions & 0 deletions src/multithread_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,44 @@ export type ISentError = ISerializedNetworkError |
ISerializedEncryptedMediaError |
ISerializedOtherError;

/**
* Message sent by the WebWorker when its initialization, started implicitely
* as soon as the `new Worker` call was made for it, has finished and succeeded.
*
* Once that message has been received, you can ensure that no
* `IInitErrorWorkerMessage` will ever be received for the same worker.
*
* Note that receiving this message is not a requirement before preparing and
* loading a content, both initialization and content loading can be started in
* parallel.
*/
export interface IInitSuccessWorkerMessage {
type: WorkerMessageType.InitSuccess;
value: null;
}

/**
* Message sent by the WebWorker when its initialization, started implicitely
* as soon as the `new Worker` call was made for it, has finished and failed.
*
* Once that message has been received, you can ensure that no
* `IInitErrorWorkerMessage` will ever be received for the same worker.
*
* Note that you may received this message while preparing and/or loading a
* content, both initialization and content loading can be started in
* parallel.
* As such, this message may be coupled with a content error.
*/
export interface IInitErrorWorkerMessage {
type: WorkerMessageType.InitError;
value: {
/** A string describing the error encountered. */
errorMessage: string;

kind: "dashWasmInitialization";
};
}

export interface INeedsBufferFlushWorkerMessage {
type: WorkerMessageType.NeedsBufferFlush;
contentId: string;
Expand Down Expand Up @@ -883,6 +921,8 @@ export const enum WorkerMessageType {
EndOfStream = "end-of-stream",
Error = "error",
InbandEvent = "inband-event",
InitError = "init-error",
InitSuccess = "init-success",
InterruptEndOfStream = "stop-end-of-stream",
InterruptMediaSourceDurationUpdate = "stop-media-source-duration",
LockedStream = "locked-stream",
Expand Down Expand Up @@ -921,6 +961,8 @@ export type IWorkerMessage = IAbortBufferWorkerMessage |
IEndOfStreamWorkerMessage |
IErrorWorkerMessage |
IInbandEventWorkerMessage |
IInitSuccessWorkerMessage |
IInitErrorWorkerMessage |
IInterruptMediaSourceDurationWorkerMessage |
ILockedStreamWorkerMessage |
ILogMessageWorkerMessage |
Expand Down

0 comments on commit cf0cd1b

Please sign in to comment.