Skip to content

Commit

Permalink
implement file chunk requests
Browse files Browse the repository at this point in the history
  • Loading branch information
liamcottle committed Dec 14, 2024
1 parent c8701c1 commit c4ee46b
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 10 deletions.
47 changes: 47 additions & 0 deletions src/js/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Connection {
static clientNotificationListeners = [];
static messageListeners = [];
static traceRouteListeners = [];
static fileChunkListeners = [];

static addMeshPacketListener(listener) {
this.meshPacketListeners.push(listener);
Expand Down Expand Up @@ -64,6 +65,16 @@ class Connection {
});
}

static addFileChunkListener(listener) {
this.fileChunkListeners.push(listener);
}

static removeFileChunkListener(listenerToRemove) {
this.fileChunkListeners = this.fileChunkListeners.filter((listener) => {
return listener !== listenerToRemove;
});
}

static async connectViaBluetooth() {

// ensure browser supports web bluetooth
Expand Down Expand Up @@ -492,6 +503,10 @@ class Connection {
await this.onFilePartPacket(meshPacket, fileTransferPacket.filePart);
} else if(fileTransferPacket.requestFileParts){
await this.onRequestFilePartsPacket(meshPacket, fileTransferPacket.requestFileParts);
} else if(fileTransferPacket.fileChunk){
await this.onFileChunkPacket(meshPacket, fileTransferPacket.fileChunk);
} else if(fileTransferPacket.requestFileChunk){
await this.onRequestFileChunkPacket(meshPacket, fileTransferPacket.requestFileChunk);
} else {
console.log("unhandled file transfer packet", fileTransferPacket);
}
Expand Down Expand Up @@ -721,6 +736,38 @@ class Connection {

}

static async onFileChunkPacket(meshPacket, fileChunk) {
for(const fileChunkListener of this.fileChunkListeners){
try {
fileChunkListener(meshPacket, fileChunk);
} catch(e){}
}
}

static async onRequestFileChunkPacket(meshPacket, requestFileChunk) {

// find existing file transfer
let fileTransfer = GlobalState.fileTransfers.find((fileTransfer) => {
return fileTransfer.id === requestFileChunk.fileTransferId;
});

// do nothing if file transfer not found
if(!fileTransfer){
return;
}

console.log(`[FileTransfer] ${fileTransfer.id} requested FileChunk[offset=${requestFileChunk.offset}, length=${requestFileChunk.length}]`);

// update file transfer progress
const filePointer = requestFileChunk.offset + requestFileChunk.length;
fileTransfer.status = FileTransferrer.STATUS_SENDING;
fileTransfer.progress = Math.min(100, Math.ceil(filePointer / fileTransfer.filesize * 100));

// send file part
await FileTransferrer.sendFileChunk(fileTransfer, requestFileChunk.offset, requestFileChunk.length);

}

}

export default Connection;
21 changes: 21 additions & 0 deletions src/js/FileTransferAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ class FileTransferAPI {
});
}

static async requestFileChunk(nodeId, fileTransferId, offset, length) {
await this.sendFileTransferPacket(nodeId, {
requestFileChunk: {
fileTransferId: fileTransferId,
offset: offset,
length: length,
},
});
}

static async sendFileChunk(nodeId, fileTransferId, offset, length, data) {
await this.sendFileTransferPacket(nodeId, {
fileChunk: {
fileTransferId: fileTransferId,
offset: offset,
length: length,
data: data,
},
});
}

}

export default FileTransferAPI;
137 changes: 127 additions & 10 deletions src/js/FileTransferrer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import NodeAPI from "./NodeAPI.js";
import GlobalState from "./GlobalState.js";
import FileTransferAPI from "./FileTransferAPI.js";
import Connection from "./Connection.js";

class FileTransferrer {

Expand Down Expand Up @@ -67,19 +68,52 @@ class FileTransferrer {

static async acceptFileTransfer(fileTransfer) {

for(var attempt = 1; attempt <= this.MAX_PACKET_ATTEMPTS; attempt++){
try {
this.log(`acceptFileTransfer attempt ${attempt}`);
await FileTransferAPI.acceptFileTransfer(fileTransfer.from, fileTransfer.id);
fileTransfer.status = this.STATUS_ACCEPTED;
// create buffer for file data
fileTransfer.status = this.STATUS_ACCEPTED;
fileTransfer.data = new Uint8Array(0);

// loop until all bytes received
var offset = 0;
var length = 200;
while(fileTransfer.data.length < fileTransfer.filesize){

// stop fetching file chunks if the file transfer has been cancelled
if(fileTransfer.status === FileTransferrer.STATUS_CANCELLED){
return;
} catch(e) {
console.log(e);
if(attempt === this.MAX_PACKET_ATTEMPTS){
this.log("acceptFileTransfer failed", e);
throw e;
}

try {

// fetch next file chunk
offset = fileTransfer.data.length;
const fileChunk = await FileTransferrer.getFileChunk(fileTransfer, offset, length);

// append received data
fileTransfer.data = new Uint8Array([
...fileTransfer.data,
...fileChunk.data,
]);

// check if completed
if(fileTransfer.data.length === fileTransfer.filesize){
// todo check integrity of received data (implement a crc or hash)
fileTransfer.status = FileTransferrer.STATUS_COMPLETED;
fileTransfer.blob = new Blob([fileTransfer.data], {
type: "application/octet-stream",
});
await FileTransferrer.completeFileTransfer(fileTransfer);
return;
}

// update file transfer progress
const filePointer = fileChunk.offset + fileChunk.length;
fileTransfer.status = FileTransferrer.STATUS_RECEIVING;
fileTransfer.progress = Math.min(100, Math.ceil(filePointer / fileTransfer.filesize * 100));

} catch(e) {
this.log("failed to get file chunk", e);
}

}

}
Expand Down Expand Up @@ -182,6 +216,23 @@ class FileTransferrer {
}
}

static async requestFileChunk(fileTransfer, offset, length) {
await FileTransferAPI.requestFileChunk(fileTransfer.from, fileTransfer.id, offset, length);
}

static async sendFileChunk(fileTransfer, offset, length) {

// update status
fileTransfer.status = FileTransferrer.STATUS_SENDING;

// get data for this part
const data = fileTransfer.data.slice(offset, offset + length);

// send file chunk
await FileTransferAPI.sendFileChunk(fileTransfer.to, fileTransfer.id, offset, length, data);

}

static async completeFileTransfer(fileTransfer) {
for(var attempt = 1; attempt <= this.MAX_PACKET_ATTEMPTS; attempt++){
try {
Expand All @@ -198,6 +249,72 @@ class FileTransferrer {
}
}

/**
* Fetches a file chunk for the provided file transfer
* @param fileTransfer the file transfer to get a file chunk from
* @param offset the offset to fetch from
* @param length the length of data to fetch
* @param timeoutMillis how long to wait for file chunk before giving up
* @returns {Promise<unknown>}
*/
static async getFileChunk(fileTransfer, offset, length, timeoutMillis = 15000) {
var timeout = null;
var fileChunkListener = null;
return new Promise(async (resolve, reject) => {
try {

// handle file chunk
fileChunkListener = (meshPacket, fileChunk) => {

// ignore packet if not from expected node
if(meshPacket.from !== fileTransfer.from){
this.log("ignoring file chunk that isn't from the node that offered this file transfer");
return;
}

// ignore packet if not for requested file transfer id
if(fileChunk.fileTransferId !== fileTransfer.id){
this.log("ignoring file chunk that isn't for this file transfer");
return;
}

// ignore packet if not for requested offset and length
if(fileChunk.offset !== offset || fileChunk.length !== length){
this.log("ignoring file chunk that isn't for the offset and length we requested");
return;
}

// we have file chunk, so we no longer want to time out
clearTimeout(timeout);

// stop listening for file chunks
Connection.removeFileChunkListener(fileChunkListener);

// resolve promise
resolve(fileChunk);

};

// timeout after configured delay
timeout = setTimeout(() => {
Connection.removeFileChunkListener(fileChunkListener);
reject("timeout");
}, timeoutMillis);

// listen for file chunks
Connection.addFileChunkListener(fileChunkListener);

// request file chunk
await this.requestFileChunk(fileTransfer, offset, length);

} catch(e) {
clearTimeout(timeout);
Connection.removeFileChunkListener(fileChunkListener);
reject(e);
}
});
}

}

export default FileTransferrer;
15 changes: 15 additions & 0 deletions src/public/protos/file_transfer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ message FileTransferPacket {
optional CompletedFileTransfer completedFileTransfer = 5;
optional FilePart filePart = 6;
optional RequestFileParts requestFileParts = 7;
optional RequestFileChunk requestFileChunk = 8;
optional FileChunk fileChunk = 9;
}

message OfferFileTransfer {
Expand Down Expand Up @@ -56,3 +58,16 @@ message FilePart {
uint32 totalParts = 3;
bytes data = 4;
}

message RequestFileChunk {
uint32 fileTransferId = 1;
uint32 offset = 2;
uint32 length = 3;
}

message FileChunk {
uint32 fileTransferId = 1;
uint32 offset = 2;
uint32 length = 3;
bytes data = 4;
}

0 comments on commit c4ee46b

Please sign in to comment.