Skip to content

Commit

Permalink
fix: refactor queue to prevent multiples streams
Browse files Browse the repository at this point in the history
  • Loading branch information
miksuh-dev committed Mar 24, 2023
1 parent 45b2714 commit 0413061
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 140 deletions.
242 changes: 134 additions & 108 deletions server/common/playlist/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,58 @@ import {
createStream as createYoutubeStream,
getVideoInfo,
} from "../youtube-dl";
import { PlayError } from "./types";
import { ProcessQueueItem, ProcessQueueItemStatus } from "./types";

const MAX_RETRIES = 3;

let playing: PlayingSong | undefined;
const processingQueue = new Array<ProcessQueueItem>();
let stream: Readable | undefined;
let endTimeout: NodeJS.Timeout | undefined;
let playError: PlayError | undefined;

const processQueue = async (caller: ProcessQueueItem) => {
if (processingQueue.length === 0) return;

const current = processingQueue.at(0);
if (!current) return;

if (current.song.id !== caller.song.id) return;

await current.callback();

// Clear rest of the queue as we have decided next song
processingQueue.splice(1);
};

export const addSongToQueue = async (song: Song) => {
const item: ProcessQueueItem = {
status: ProcessQueueItemStatus.pending,
song,
retryCount: 0,
callback: async function () {
try {
await playSong.call(this);
} catch (e) {
if (e instanceof Error) {
this.status = ProcessQueueItemStatus.error;

void onSongError.call(this, e.message);
}
}
},
};

processingQueue.push(item);

await processQueue(item);
};

export const removeSongFromQueue = (song: Song) => {
processingQueue.forEach((item, index) => {
if (item.song.id === song.id) {
processingQueue.splice(index, 1);
}
});
};

export const onSongEnd = async (song: Song) => {
await prisma.song.update({
Expand All @@ -40,59 +84,42 @@ export const onSongEnd = async (song: Song) => {
ee.emit(`onUpdate`, { song: { remove: [song.id] } });

client.voiceConnection.stopStream();

removeSongFromQueue(song);

const nextSong = await getNextSong();
if (nextSong) {
await playSong(nextSong);
await addSongToQueue(nextSong);
}
};

const handleSongErrorTimeout = async (song: Song) => {
// Ignore as error is no longer relevant
if (!playError || playing) {
return;
}
async function onSongError(this: ProcessQueueItem, error: string) {
try {
if (stream) {
stream.destroy();
}

if (playError.retryCount > MAX_RETRIES) {
return onSongEnd(song);
}
this.retryCount += 1;

return playSong(song);
};
const shouldStop = this.retryCount > MAX_RETRIES;

const onPlayError = (song: Song, error: string) => {
// No longer relevant
if (playing?.id) {
playError = undefined;
return;
}
sendErrorMessage(shouldStop ? "event.error" : "event.retry", {
item: this.song.title,
error,
});

if (!playError || playError.id !== song.id) {
playError = {
id: song.id,
retryCount: 0,
};
}
if (shouldStop) {
this.status = ProcessQueueItemStatus.skipped;
await onSongEnd(this.song);
}

if (stream) {
stream.destroy();
setTimeout(() => {
void processQueue(this);
}, 5000);
} catch (e) {
console.log("e", e);
}

playError.retryCount += 1;

sendErrorMessage(
playError.retryCount > MAX_RETRIES ? "event.error" : "event.retry",
{ item: song.title, error }
);

setTimeout(() => {
handleSongErrorTimeout(song).catch((e) => {
if (e instanceof Error) {
onPlayError(song, e.message);
}
console.log("e", e);
});
}, 5000);
};
}

const createStream = async (song: Song) => {
if (song.type === SourceType.SONG) {
Expand All @@ -106,61 +133,61 @@ const createStream = async (song: Song) => {
throw new Error("Unknown song type");
};

const onPlayStart = async (song: Song) => {
let currentSong = undefined;
async function onPlayStart(this: ProcessQueueItem) {
try {
this.status = ProcessQueueItemStatus.processing;

if (song.type === SourceType.SONG) {
const videoInfo = await getVideoInfo(song);
if (!videoInfo) {
throw new Error("Failed to get video info");
if (this.status !== ProcessQueueItemStatus.processing) {
throw new Error("Failed to update song status");
}

currentSong = setCurrentSong({
...song,
startedAt: DateTime.now(),
duration: videoInfo.duration,
sendMessage(`event.source.${this.song.type}.start`, {
item: this.song.title,
});

const secondsLeft = currentSong.startedAt
.plus({ seconds: videoInfo.duration })
.diffNow("seconds").seconds;
if (this.song.type === SourceType.SONG) {
const videoInfo = await getVideoInfo(this.song);
if (!videoInfo) {
throw new Error("Failed to get video info");
}

if (endTimeout) {
clearTimeout(endTimeout);
}
endTimeout = setTimeout(() => {
onSongEnd(song).catch((e) => {
if (e instanceof Error) {
onPlayError(song, e.message);
}
console.log("e", e);
});
}, secondsLeft * 1000);
} else {
currentSong = setCurrentSong({
...song,
startedAt: DateTime.now(),
});
}
this.song.startedAt = DateTime.now();
this.song.duration = videoInfo.duration;

await prisma.song.update({
where: {
id: currentSong.id,
},
data: {
started: true,
},
});
const secondsLeft = this.song.startedAt
.plus({ seconds: videoInfo.duration })
.diffNow("seconds").seconds;

sendMessage(`event.source.${song.type}.start`, { item: song.title });
if (endTimeout) clearTimeout(endTimeout);

return currentSong;
};
endTimeout = setTimeout(() => {
onSongEnd(this.song).catch((e) => {
console.log("e", e);
});
}, secondsLeft * 1000);
} else {
this.song.startedAt = DateTime.now();
}

export const playSong = async (song: Song) => {
try {
if (playing) return;
await prisma.song.update({
where: {
id: this.song.id,
},
data: {
started: true,
},
});

return this.song;
} catch (e) {
if (e instanceof Error) {
void onSongError.call(this, e.message);
}
}
}

export async function playSong(this: ProcessQueueItem) {
try {
if (endTimeout) {
clearTimeout(endTimeout);
}
Expand All @@ -169,56 +196,55 @@ export const playSong = async (song: Song) => {
stream.destroy();
}

stream = await createStream(song);
stream = await createStream(this.song);

ee.emit(`onUpdate`, {
song: { setPlaying: this.song },
});

let started = false;
stream.on("data", () => {
if (!started) {
started = true;

onPlayStart(song).catch((e) => {
if (e instanceof Error) {
onPlayError(song, e.message);
}
onPlayStart.call(this).catch((e) => {
console.log("e", e);
});
}
});

stream.on("error", (e) => {
if (e instanceof Error) {
onPlayError(song, e.message);
void onSongError.call(this, e.message);
}
});

client.voiceConnection
.playStream(stream, "0")
.on("error", (message: string) => {
onPlayError(song, message);
void onSongError.call(this, message);
});
} catch (e) {
if (e instanceof Error) {
onPlayError(song, e.message);
void onSongError.call(this, e.message);
}
}
};
}

export const getCurrentSong = (): PlayingSong | undefined => {
return playing;
};
const item = processingQueue.at(0);

export const setCurrentSong = (song: PlayingSong) => {
playing = song;

ee.emit(`onUpdate`, {
song: { setPlaying: playing },
});
if (!item || item.status !== ProcessQueueItemStatus.processing) {
return undefined;
}

return playing;
return item.song;
};

export const stopCurrentSong = () => {
playing = undefined;
if (stream) {
stream.destroy();
}

client.voiceConnection.stopStream();
clearTimeout(endTimeout);
Expand Down
29 changes: 29 additions & 0 deletions server/common/playlist/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
import { Song } from "@prisma/client";
import { PlayingSong } from "types/app";

export interface PlayError {
id: number;
retryCount: number;
}

export enum ProcessQueueItemStatus {
pending = "pending",
processing = "processing",
skipped = "skipped",
error = "error",
done = "done",
}

interface NonActiveItem {
status:
| ProcessQueueItemStatus.pending
| ProcessQueueItemStatus.error
| ProcessQueueItemStatus.skipped;
song: Song;
}

interface ActiveItem {
status: ProcessQueueItemStatus.processing;
song: PlayingSong;
}

export type ProcessQueueItem = (NonActiveItem | ActiveItem) & {
retryCount: number;
callback: () => Promise<void>;
};
Loading

0 comments on commit 0413061

Please sign in to comment.