diff --git a/server/common/playlist/internal.ts b/server/common/playlist/internal.ts index 0c3a14b..95886af 100644 --- a/server/common/playlist/internal.ts +++ b/server/common/playlist/internal.ts @@ -14,14 +14,58 @@ import { createStream as createYoutubeStream, getVideoInfo, } from "../youtube-dl"; -import { PlayError } from "./types"; +import { ProcessQueueItem, ProcessQueueItemStatus } from "./types"; const MAX_RETRIES = 3; -let playing: PlayingSong | undefined; +const processingQueue = new Array(); let stream: Readable | undefined; let endTimeout: NodeJS.Timeout | undefined; -let playError: PlayError | undefined; + +const processQueue = async (caller: ProcessQueueItem) => { + if (processingQueue.length === 0) return; + + const current = processingQueue.at(0); + if (!current) return; + + if (current.song.id !== caller.song.id) return; + + await current.callback(); + + // Clear rest of the queue as we have decided next song + processingQueue.splice(1); +}; + +export const addSongToQueue = async (song: Song) => { + const item: ProcessQueueItem = { + status: ProcessQueueItemStatus.pending, + song, + retryCount: 0, + callback: async function () { + try { + await playSong.call(this); + } catch (e) { + if (e instanceof Error) { + this.status = ProcessQueueItemStatus.error; + + void onSongError.call(this, e.message); + } + } + }, + }; + + processingQueue.push(item); + + await processQueue(item); +}; + +export const removeSongFromQueue = (song: Song) => { + processingQueue.forEach((item, index) => { + if (item.song.id === song.id) { + processingQueue.splice(index, 1); + } + }); +}; export const onSongEnd = async (song: Song) => { await prisma.song.update({ @@ -40,59 +84,42 @@ export const onSongEnd = async (song: Song) => { ee.emit(`onUpdate`, { song: { remove: [song.id] } }); client.voiceConnection.stopStream(); + + removeSongFromQueue(song); + const nextSong = await getNextSong(); if (nextSong) { - await playSong(nextSong); + await addSongToQueue(nextSong); } }; -const handleSongErrorTimeout = async (song: Song) => { - // Ignore as error is no longer relevant - if (!playError || playing) { - return; - } +async function onSongError(this: ProcessQueueItem, error: string) { + try { + if (stream) { + stream.destroy(); + } - if (playError.retryCount > MAX_RETRIES) { - return onSongEnd(song); - } + this.retryCount += 1; - return playSong(song); -}; + const shouldStop = this.retryCount > MAX_RETRIES; -const onPlayError = (song: Song, error: string) => { - // No longer relevant - if (playing?.id) { - playError = undefined; - return; - } + sendErrorMessage(shouldStop ? "event.error" : "event.retry", { + item: this.song.title, + error, + }); - if (!playError || playError.id !== song.id) { - playError = { - id: song.id, - retryCount: 0, - }; - } + if (shouldStop) { + this.status = ProcessQueueItemStatus.skipped; + await onSongEnd(this.song); + } - if (stream) { - stream.destroy(); + setTimeout(() => { + void processQueue(this); + }, 5000); + } catch (e) { + console.log("e", e); } - - playError.retryCount += 1; - - sendErrorMessage( - playError.retryCount > MAX_RETRIES ? "event.error" : "event.retry", - { item: song.title, error } - ); - - setTimeout(() => { - handleSongErrorTimeout(song).catch((e) => { - if (e instanceof Error) { - onPlayError(song, e.message); - } - console.log("e", e); - }); - }, 5000); -}; +} const createStream = async (song: Song) => { if (song.type === SourceType.SONG) { @@ -106,61 +133,61 @@ const createStream = async (song: Song) => { throw new Error("Unknown song type"); }; -const onPlayStart = async (song: Song) => { - let currentSong = undefined; +async function onPlayStart(this: ProcessQueueItem) { + try { + this.status = ProcessQueueItemStatus.processing; - if (song.type === SourceType.SONG) { - const videoInfo = await getVideoInfo(song); - if (!videoInfo) { - throw new Error("Failed to get video info"); + if (this.status !== ProcessQueueItemStatus.processing) { + throw new Error("Failed to update song status"); } - currentSong = setCurrentSong({ - ...song, - startedAt: DateTime.now(), - duration: videoInfo.duration, + sendMessage(`event.source.${this.song.type}.start`, { + item: this.song.title, }); - const secondsLeft = currentSong.startedAt - .plus({ seconds: videoInfo.duration }) - .diffNow("seconds").seconds; + if (this.song.type === SourceType.SONG) { + const videoInfo = await getVideoInfo(this.song); + if (!videoInfo) { + throw new Error("Failed to get video info"); + } - if (endTimeout) { - clearTimeout(endTimeout); - } - endTimeout = setTimeout(() => { - onSongEnd(song).catch((e) => { - if (e instanceof Error) { - onPlayError(song, e.message); - } - console.log("e", e); - }); - }, secondsLeft * 1000); - } else { - currentSong = setCurrentSong({ - ...song, - startedAt: DateTime.now(), - }); - } + this.song.startedAt = DateTime.now(); + this.song.duration = videoInfo.duration; - await prisma.song.update({ - where: { - id: currentSong.id, - }, - data: { - started: true, - }, - }); + const secondsLeft = this.song.startedAt + .plus({ seconds: videoInfo.duration }) + .diffNow("seconds").seconds; - sendMessage(`event.source.${song.type}.start`, { item: song.title }); + if (endTimeout) clearTimeout(endTimeout); - return currentSong; -}; + endTimeout = setTimeout(() => { + onSongEnd(this.song).catch((e) => { + console.log("e", e); + }); + }, secondsLeft * 1000); + } else { + this.song.startedAt = DateTime.now(); + } -export const playSong = async (song: Song) => { - try { - if (playing) return; + await prisma.song.update({ + where: { + id: this.song.id, + }, + data: { + started: true, + }, + }); + return this.song; + } catch (e) { + if (e instanceof Error) { + void onSongError.call(this, e.message); + } + } +} + +export async function playSong(this: ProcessQueueItem) { + try { if (endTimeout) { clearTimeout(endTimeout); } @@ -169,17 +196,18 @@ export const playSong = async (song: Song) => { stream.destroy(); } - stream = await createStream(song); + stream = await createStream(this.song); + + ee.emit(`onUpdate`, { + song: { setPlaying: this.song }, + }); let started = false; stream.on("data", () => { if (!started) { started = true; - onPlayStart(song).catch((e) => { - if (e instanceof Error) { - onPlayError(song, e.message); - } + onPlayStart.call(this).catch((e) => { console.log("e", e); }); } @@ -187,38 +215,36 @@ export const playSong = async (song: Song) => { stream.on("error", (e) => { if (e instanceof Error) { - onPlayError(song, e.message); + void onSongError.call(this, e.message); } }); client.voiceConnection .playStream(stream, "0") .on("error", (message: string) => { - onPlayError(song, message); + void onSongError.call(this, message); }); } catch (e) { if (e instanceof Error) { - onPlayError(song, e.message); + void onSongError.call(this, e.message); } } -}; +} export const getCurrentSong = (): PlayingSong | undefined => { - return playing; -}; + const item = processingQueue.at(0); -export const setCurrentSong = (song: PlayingSong) => { - playing = song; - - ee.emit(`onUpdate`, { - song: { setPlaying: playing }, - }); + if (!item || item.status !== ProcessQueueItemStatus.processing) { + return undefined; + } - return playing; + return item.song; }; export const stopCurrentSong = () => { - playing = undefined; + if (stream) { + stream.destroy(); + } client.voiceConnection.stopStream(); clearTimeout(endTimeout); diff --git a/server/common/playlist/types.ts b/server/common/playlist/types.ts index 4ffe777..670f987 100644 --- a/server/common/playlist/types.ts +++ b/server/common/playlist/types.ts @@ -1,4 +1,33 @@ +import { Song } from "@prisma/client"; +import { PlayingSong } from "types/app"; + export interface PlayError { id: number; retryCount: number; } + +export enum ProcessQueueItemStatus { + pending = "pending", + processing = "processing", + skipped = "skipped", + error = "error", + done = "done", +} + +interface NonActiveItem { + status: + | ProcessQueueItemStatus.pending + | ProcessQueueItemStatus.error + | ProcessQueueItemStatus.skipped; + song: Song; +} + +interface ActiveItem { + status: ProcessQueueItemStatus.processing; + song: PlayingSong; +} + +export type ProcessQueueItem = (NonActiveItem | ActiveItem) & { + retryCount: number; + callback: () => Promise; +}; diff --git a/server/common/playlist/user.ts b/server/common/playlist/user.ts index 11c9564..0fce39a 100644 --- a/server/common/playlist/user.ts +++ b/server/common/playlist/user.ts @@ -4,9 +4,10 @@ import prisma from "../../prisma"; import { sendMessage } from "../../router/room/message"; import { MessageType } from "../../router/room/types"; import { + addSongToQueue, getCurrentSong, getNextSong, - playSong, + removeSongFromQueue, stopCurrentSong, } from "./internal"; @@ -77,36 +78,13 @@ export const addSongs = async ( if (!getCurrentSong()) { const nextSong = await getNextSong(); if (nextSong) { - await playSong(nextSong); + await addSongToQueue(nextSong); } } return addedSongs; }; -export const startPlay = async (user: OnlineUser) => { - const currentSong = getCurrentSong(); - - if (currentSong) { - throw new Error("Kappale on jo soimassa"); - } - - const nextSong = await getNextSong(); - if (nextSong) { - await playSong(nextSong); - - sendMessage(`event.source.${nextSong.type}.started`, { - user, - type: MessageType.ACTION, - item: nextSong.title, - }); - - return nextSong; - } - - throw new Error("Ei kappaleita jonossa"); -}; - export const removeSong = async (id: number, user: OnlineUser) => { const song = await prisma.song.update({ where: { @@ -133,10 +111,12 @@ export const removeSong = async (id: number, user: OnlineUser) => { ee.emit(`onUpdate`, { song: { remove: [song.id] } }); } + removeSongFromQueue(song); + if (!getCurrentSong()) { const nextSong = await getNextSong(); if (nextSong) { - await playSong(nextSong); + await addSongToQueue(nextSong); } } diff --git a/server/router/room/index.ts b/server/router/room/index.ts index 525585b..2a15d9a 100644 --- a/server/router/room/index.ts +++ b/server/router/room/index.ts @@ -9,7 +9,6 @@ import { playNext, removeSong, shufflePlaylist, - startPlay, } from "../../common/playlist/user"; import ee from "../../eventEmitter"; import { t } from "../../trpc"; @@ -111,11 +110,7 @@ export const roomRouter = t.router({ .mutation(async ({ ctx, input }) => { const { onlineUser } = ctx; - if (input.id) { - return removeSong(input.id, onlineUser); - } - - return startPlay(onlineUser); + return removeSong(input.id, onlineUser); }), clearPlaylist: onlineUserProcedure.mutation(async ({ ctx }) => { const { onlineUser } = ctx;