From c4ee46b33c1dba8ff4638bdbef70d8ad58e0921f Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sat, 14 Dec 2024 23:36:25 +1300 Subject: [PATCH] implement file chunk requests --- src/js/Connection.js | 47 +++++++++ src/js/FileTransferAPI.js | 21 ++++ src/js/FileTransferrer.js | 137 ++++++++++++++++++++++++-- src/public/protos/file_transfer.proto | 15 +++ 4 files changed, 210 insertions(+), 10 deletions(-) diff --git a/src/js/Connection.js b/src/js/Connection.js index f053fc2..628ec5a 100644 --- a/src/js/Connection.js +++ b/src/js/Connection.js @@ -13,6 +13,7 @@ class Connection { static clientNotificationListeners = []; static messageListeners = []; static traceRouteListeners = []; + static fileChunkListeners = []; static addMeshPacketListener(listener) { this.meshPacketListeners.push(listener); @@ -64,6 +65,16 @@ class Connection { }); } + static addFileChunkListener(listener) { + this.fileChunkListeners.push(listener); + } + + static removeFileChunkListener(listenerToRemove) { + this.fileChunkListeners = this.fileChunkListeners.filter((listener) => { + return listener !== listenerToRemove; + }); + } + static async connectViaBluetooth() { // ensure browser supports web bluetooth @@ -492,6 +503,10 @@ class Connection { await this.onFilePartPacket(meshPacket, fileTransferPacket.filePart); } else if(fileTransferPacket.requestFileParts){ await this.onRequestFilePartsPacket(meshPacket, fileTransferPacket.requestFileParts); + } else if(fileTransferPacket.fileChunk){ + await this.onFileChunkPacket(meshPacket, fileTransferPacket.fileChunk); + } else if(fileTransferPacket.requestFileChunk){ + await this.onRequestFileChunkPacket(meshPacket, fileTransferPacket.requestFileChunk); } else { console.log("unhandled file transfer packet", fileTransferPacket); } @@ -721,6 +736,38 @@ class Connection { } + static async onFileChunkPacket(meshPacket, fileChunk) { + for(const fileChunkListener of this.fileChunkListeners){ + try { + fileChunkListener(meshPacket, fileChunk); + } catch(e){} + } + } + + static async onRequestFileChunkPacket(meshPacket, requestFileChunk) { + + // find existing file transfer + let fileTransfer = GlobalState.fileTransfers.find((fileTransfer) => { + return fileTransfer.id === requestFileChunk.fileTransferId; + }); + + // do nothing if file transfer not found + if(!fileTransfer){ + return; + } + + console.log(`[FileTransfer] ${fileTransfer.id} requested FileChunk[offset=${requestFileChunk.offset}, length=${requestFileChunk.length}]`); + + // update file transfer progress + const filePointer = requestFileChunk.offset + requestFileChunk.length; + fileTransfer.status = FileTransferrer.STATUS_SENDING; + fileTransfer.progress = Math.min(100, Math.ceil(filePointer / fileTransfer.filesize * 100)); + + // send file part + await FileTransferrer.sendFileChunk(fileTransfer, requestFileChunk.offset, requestFileChunk.length); + + } + } export default Connection; diff --git a/src/js/FileTransferAPI.js b/src/js/FileTransferAPI.js index 01006c7..5abc99b 100644 --- a/src/js/FileTransferAPI.js +++ b/src/js/FileTransferAPI.js @@ -99,6 +99,27 @@ class FileTransferAPI { }); } + static async requestFileChunk(nodeId, fileTransferId, offset, length) { + await this.sendFileTransferPacket(nodeId, { + requestFileChunk: { + fileTransferId: fileTransferId, + offset: offset, + length: length, + }, + }); + } + + static async sendFileChunk(nodeId, fileTransferId, offset, length, data) { + await this.sendFileTransferPacket(nodeId, { + fileChunk: { + fileTransferId: fileTransferId, + offset: offset, + length: length, + data: data, + }, + }); + } + } export default FileTransferAPI; diff --git a/src/js/FileTransferrer.js b/src/js/FileTransferrer.js index ea78a8f..0015055 100644 --- a/src/js/FileTransferrer.js +++ b/src/js/FileTransferrer.js @@ -1,6 +1,7 @@ import NodeAPI from "./NodeAPI.js"; import GlobalState from "./GlobalState.js"; import FileTransferAPI from "./FileTransferAPI.js"; +import Connection from "./Connection.js"; class FileTransferrer { @@ -67,19 +68,52 @@ class FileTransferrer { static async acceptFileTransfer(fileTransfer) { - for(var attempt = 1; attempt <= this.MAX_PACKET_ATTEMPTS; attempt++){ - try { - this.log(`acceptFileTransfer attempt ${attempt}`); - await FileTransferAPI.acceptFileTransfer(fileTransfer.from, fileTransfer.id); - fileTransfer.status = this.STATUS_ACCEPTED; + // create buffer for file data + fileTransfer.status = this.STATUS_ACCEPTED; + fileTransfer.data = new Uint8Array(0); + + // loop until all bytes received + var offset = 0; + var length = 200; + while(fileTransfer.data.length < fileTransfer.filesize){ + + // stop fetching file chunks if the file transfer has been cancelled + if(fileTransfer.status === FileTransferrer.STATUS_CANCELLED){ return; - } catch(e) { - console.log(e); - if(attempt === this.MAX_PACKET_ATTEMPTS){ - this.log("acceptFileTransfer failed", e); - throw e; + } + + try { + + // fetch next file chunk + offset = fileTransfer.data.length; + const fileChunk = await FileTransferrer.getFileChunk(fileTransfer, offset, length); + + // append received data + fileTransfer.data = new Uint8Array([ + ...fileTransfer.data, + ...fileChunk.data, + ]); + + // check if completed + if(fileTransfer.data.length === fileTransfer.filesize){ + // todo check integrity of received data (implement a crc or hash) + fileTransfer.status = FileTransferrer.STATUS_COMPLETED; + fileTransfer.blob = new Blob([fileTransfer.data], { + type: "application/octet-stream", + }); + await FileTransferrer.completeFileTransfer(fileTransfer); + return; } + + // update file transfer progress + const filePointer = fileChunk.offset + fileChunk.length; + fileTransfer.status = FileTransferrer.STATUS_RECEIVING; + fileTransfer.progress = Math.min(100, Math.ceil(filePointer / fileTransfer.filesize * 100)); + + } catch(e) { + this.log("failed to get file chunk", e); } + } } @@ -182,6 +216,23 @@ class FileTransferrer { } } + static async requestFileChunk(fileTransfer, offset, length) { + await FileTransferAPI.requestFileChunk(fileTransfer.from, fileTransfer.id, offset, length); + } + + static async sendFileChunk(fileTransfer, offset, length) { + + // update status + fileTransfer.status = FileTransferrer.STATUS_SENDING; + + // get data for this part + const data = fileTransfer.data.slice(offset, offset + length); + + // send file chunk + await FileTransferAPI.sendFileChunk(fileTransfer.to, fileTransfer.id, offset, length, data); + + } + static async completeFileTransfer(fileTransfer) { for(var attempt = 1; attempt <= this.MAX_PACKET_ATTEMPTS; attempt++){ try { @@ -198,6 +249,72 @@ class FileTransferrer { } } + /** + * Fetches a file chunk for the provided file transfer + * @param fileTransfer the file transfer to get a file chunk from + * @param offset the offset to fetch from + * @param length the length of data to fetch + * @param timeoutMillis how long to wait for file chunk before giving up + * @returns {Promise} + */ + static async getFileChunk(fileTransfer, offset, length, timeoutMillis = 15000) { + var timeout = null; + var fileChunkListener = null; + return new Promise(async (resolve, reject) => { + try { + + // handle file chunk + fileChunkListener = (meshPacket, fileChunk) => { + + // ignore packet if not from expected node + if(meshPacket.from !== fileTransfer.from){ + this.log("ignoring file chunk that isn't from the node that offered this file transfer"); + return; + } + + // ignore packet if not for requested file transfer id + if(fileChunk.fileTransferId !== fileTransfer.id){ + this.log("ignoring file chunk that isn't for this file transfer"); + return; + } + + // ignore packet if not for requested offset and length + if(fileChunk.offset !== offset || fileChunk.length !== length){ + this.log("ignoring file chunk that isn't for the offset and length we requested"); + return; + } + + // we have file chunk, so we no longer want to time out + clearTimeout(timeout); + + // stop listening for file chunks + Connection.removeFileChunkListener(fileChunkListener); + + // resolve promise + resolve(fileChunk); + + }; + + // timeout after configured delay + timeout = setTimeout(() => { + Connection.removeFileChunkListener(fileChunkListener); + reject("timeout"); + }, timeoutMillis); + + // listen for file chunks + Connection.addFileChunkListener(fileChunkListener); + + // request file chunk + await this.requestFileChunk(fileTransfer, offset, length); + + } catch(e) { + clearTimeout(timeout); + Connection.removeFileChunkListener(fileChunkListener); + reject(e); + } + }); + } + } export default FileTransferrer; diff --git a/src/public/protos/file_transfer.proto b/src/public/protos/file_transfer.proto index 4c9817a..1145da5 100644 --- a/src/public/protos/file_transfer.proto +++ b/src/public/protos/file_transfer.proto @@ -20,6 +20,8 @@ message FileTransferPacket { optional CompletedFileTransfer completedFileTransfer = 5; optional FilePart filePart = 6; optional RequestFileParts requestFileParts = 7; + optional RequestFileChunk requestFileChunk = 8; + optional FileChunk fileChunk = 9; } message OfferFileTransfer { @@ -56,3 +58,16 @@ message FilePart { uint32 totalParts = 3; bytes data = 4; } + +message RequestFileChunk { + uint32 fileTransferId = 1; + uint32 offset = 2; + uint32 length = 3; +} + +message FileChunk { + uint32 fileTransferId = 1; + uint32 offset = 2; + uint32 length = 3; + bytes data = 4; +}