Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: node 16 passthrough multiple callback error #88

Merged
merged 2 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 135 additions & 126 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ const fs = require('fs');
const nodePath = require('path'); // renamed to prevent conflicts in `scanDir`
const { promisify } = require('util');
const { execFile } = require('child_process');
const { PassThrough, Transform, ReadableStream } = require('stream');
const { PassThrough, Transform, Readable } = require('stream');
const { Socket } = require('dgram');
const NodeClamError = require('./lib/NodeClamError');
const NodeClamTransform = require('./lib/NodeClamTransform.js');

/**
* @typedef {ReadableStream} ReadableStream
*/

// Enable these once the FS.promises API is no longer experimental
// const fsPromises = require('fs').promises;
// const fsAccess = fsPromises.access;
Expand Down Expand Up @@ -1182,7 +1178,7 @@ class NodeClam {
// Ex. uploadStream.pipe(<this_transform_stream>).pipe(destination_stream)
return new Transform({
// This should be fired on each chunk received
async transform(chunk, encoding, cb) {
transform(chunk, encoding, cb) {
// DRY method for handling each chunk as it comes in
const doTransform = () => {
// Write data to our fork stream. If it fails,
Expand Down Expand Up @@ -1231,134 +1227,147 @@ class NodeClam {
// Setup an array to collect the responses from ClamAV
this._clamavResponseChunks = [];

try {
// Get a connection to the ClamAV Socket
this._clamavSocket = await me._initSocket('passthrough');
if (me.settings.debugMode) console.log(`${me.debugLabel}: ClamAV Socket Initialized...`);

// Setup a pipeline that will pass chunks through our custom Tranform and on to ClamAV
this._forkStream.pipe(this._clamavTransform).pipe(this._clamavSocket);

// When the CLamAV socket connection is closed (could be after 'end' or because of an error)...
this._clamavSocket
.on('close', (hadError) => {
if (me.settings.debugMode)
console.log(
`${me.debugLabel}: ClamAV socket has been closed! Because of Error:`,
hadError
);
this._clamavSocket.end();
})
// When the ClamAV socket connection ends (receives chunk)
.on('end', () => {
this._clamavSocket.end();
if (me.settings.debugMode)
console.log(`${me.debugLabel}: ClamAV socket has received the last chunk!`);
// Process the collected chunks
const response = Buffer.concat(this._clamavResponseChunks);
const result = me._processResult(response.toString('utf8'), null);
this._clamavResponseChunks = [];
if (me.settings.debugMode) {
console.log(`${me.debugLabel}: Result of scan:`, result);
console.log(
`${me.debugLabel}: It took ${_avScanTime} seconds to scan the file(s).`
);
clearScanBenchmark();
}
// Get a connection to the ClamAV Socket
me._initSocket('passthrough').then(
(socket) => {
this._clamavSocket = socket;

if (me.settings.debugMode) console.log(`${me.debugLabel}: ClamAV Socket Initialized...`);

// If the scan timed-out
if (result.timeout === true) this.emit('timeout');
// Setup a pipeline that will pass chunks through our custom Tranform and on to ClamAV
this._forkStream.pipe(this._clamavTransform).pipe(this._clamavSocket);

// NOTE: "scan-complete" could be called by the `handleError` method.
// We don't want to to double-emit this message.
if (_scanComplete === false) {
_scanComplete = true;
// When the CLamAV socket connection is closed (could be after 'end' or because of an error)...
this._clamavSocket
.on('close', (hadError) => {
if (me.settings.debugMode)
console.log(
`${me.debugLabel}: ClamAV socket has been closed! Because of Error:`,
hadError
);
this._clamavSocket.end();
this.emit('scan-complete', result);
}
})
// If connection timesout.
.on('timeout', () => {
this.emit('timeout', new Error('Connection to host/socket has timed out'));
this._clamavSocket.end();
if (me.settings.debugMode)
console.log(`${me.debugLabel}: Connection to host/socket has timed out`);
})
// When the ClamAV socket is ready to receive packets (this will probably never fire here)
.on('ready', () => {
if (me.settings.debugMode)
console.log(`${me.debugLabel}: ClamAV socket ready to receive`);
})
// When we are officially connected to the ClamAV socket (probably will never fire here)
.on('connect', () => {
if (me.settings.debugMode) console.log(`${me.debugLabel}: Connected to ClamAV socket`);
})
// If an error is emitted from the ClamAV socket
.on('error', (err) => {
console.error(`${me.debugLabel}: Error emitted from ClamAV socket: `, err);
handleError(err);
})
// If ClamAV is sending stuff to us (ie, an "OK", "Virus FOUND", or "ERROR")
.on('data', (cvChunk) => {
// Push this chunk to our results collection array
this._clamavResponseChunks.push(cvChunk);
if (me.settings.debugMode)
console.log(`${me.debugLabel}: Got result!`, cvChunk.toString());

// Parse what we've gotten back from ClamAV so far...
const response = Buffer.concat(this._clamavResponseChunks);
const result = me._processResult(response.toString(), null);

// If there's an error supplied or if we detect a virus or timeout, stop stream immediately.
if (
result instanceof NodeClamError ||
(typeof result === 'object' &&
(('isInfected' in result && result.isInfected === true) ||
('timeout' in result && result.timeout === true)))
) {
// If a virus is detected...
if (
typeof result === 'object' &&
'isInfected' in result &&
result.isInfected === true
) {
handleError(null, true, result);
})
// When the ClamAV socket connection ends (receives chunk)
.on('end', () => {
this._clamavSocket.end();
if (me.settings.debugMode)
console.log(`${me.debugLabel}: ClamAV socket has received the last chunk!`);
// Process the collected chunks
const response = Buffer.concat(this._clamavResponseChunks);
const result = me._processResult(response.toString('utf8'), null);
this._clamavResponseChunks = [];
if (me.settings.debugMode) {
console.log(`${me.debugLabel}: Result of scan:`, result);
console.log(
`${me.debugLabel}: It took ${_avScanTime} seconds to scan the file(s).`
);
clearScanBenchmark();
}

// If a timeout is detected...
else if (
typeof result === 'object' &&
'isInfected' in result &&
result.isInfected === true
) {
this.emit('timeout');
handleError(null, false, result);
}
// If the scan timed-out
if (result.timeout === true) this.emit('timeout');

// If any other kind of error is detected...
else {
handleError(result);
// NOTE: "scan-complete" could be called by the `handleError` method.
// We don't want to to double-emit this message.
if (_scanComplete === false) {
_scanComplete = true;
this._clamavSocket.end();
this.emit('scan-complete', result);
}
}
// For debugging purposes, spit out what was processed (if anything).
else if (me.settings.debugMode)
console.log(`${me.debugLabel}: Processed Result: `, result, response.toString());
});
})
// If connection timesout.
.on('timeout', () => {
this.emit('timeout', new Error('Connection to host/socket has timed out'));
this._clamavSocket.end();
if (me.settings.debugMode)
console.log(`${me.debugLabel}: Connection to host/socket has timed out`);
})
// When the ClamAV socket is ready to receive packets (this will probably never fire here)
.on('ready', () => {
if (me.settings.debugMode)
console.log(`${me.debugLabel}: ClamAV socket ready to receive`);
})
// When we are officially connected to the ClamAV socket (probably will never fire here)
.on('connect', () => {
if (me.settings.debugMode)
console.log(`${me.debugLabel}: Connected to ClamAV socket`);
})
// If an error is emitted from the ClamAV socket
.on('error', (err) => {
console.error(`${me.debugLabel}: Error emitted from ClamAV socket: `, err);
handleError(err);
})
// If ClamAV is sending stuff to us (ie, an "OK", "Virus FOUND", or "ERROR")
.on('data', (cvChunk) => {
// Push this chunk to our results collection array
this._clamavResponseChunks.push(cvChunk);
if (me.settings.debugMode)
console.log(`${me.debugLabel}: Got result!`, cvChunk.toString());

// Parse what we've gotten back from ClamAV so far...
const response = Buffer.concat(this._clamavResponseChunks);
const result = me._processResult(response.toString(), null);

// If there's an error supplied or if we detect a virus or timeout, stop stream immediately.
if (
result instanceof NodeClamError ||
(typeof result === 'object' &&
(('isInfected' in result && result.isInfected === true) ||
('timeout' in result && result.timeout === true)))
) {
// If a virus is detected...
if (
typeof result === 'object' &&
'isInfected' in result &&
result.isInfected === true
) {
handleError(null, true, result);
}

// If a timeout is detected...
else if (
typeof result === 'object' &&
'isInfected' in result &&
result.isInfected === true
) {
this.emit('timeout');
handleError(null, false, result);
}

// If any other kind of error is detected...
else {
handleError(result);
}
}
// For debugging purposes, spit out what was processed (if anything).
else if (me.settings.debugMode)
console.log(
`${me.debugLabel}: Processed Result: `,
result,
response.toString()
);
});

if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing initial transform!`);
// Handle the chunk
doTransform();
},
(err) => {
// Close socket if it's currently valid
if (
this._clamavSocket &&
'readyState' in this._clamavSocket &&
this._clamavSocket.readyState
) {
this._clamavSocket.end();
}

if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing initial transform!`);
// Handle the chunk
doTransform();
} catch (err) {
// Close socket if it's currently valid
if (this._clamavSocket && 'readyState' in this._clamavSocket && this._clamavSocket.readyState) {
this._clamavSocket.end();
// If there's an issue connecting to the ClamAV socket, this is where that's handled
if (me.settings.debugMode)
console.error(`${me.debugLabel}: Error initiating socket to ClamAV: `, err);
handleError(err);
}

// If there's an issue connecting to the ClamAV socket, this is where that's handled
if (me.settings.debugMode)
console.error(`${me.debugLabel}: Error initiating socket to ClamAV: `, err);
handleError(err);
}
);
} else {
// if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing transform: ${++counter}`);
// Handle the chunk
Expand Down Expand Up @@ -2093,7 +2102,7 @@ class NodeClam {
* use of a TCP or UNIX Domain socket. In other words, this will not work if you only
* have access to a local ClamAV binary.
*
* @param {ReadableStream} stream - A readable stream to scan
* @param {Readable} stream - A readable stream to scan
* @param {Function} [cb] - What to do when the socket response with results
* @returns {Promise<object>} Object like: `{ file: String, isInfected: Boolean, viruses: Array }`
* @example
Expand Down
17 changes: 17 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,23 @@ describe('passthrough', () => {
});
});

// https://github.com/kylefarris/clamscan/issues/82
it('should not throw multiple callback error', (done) => {
// To reliably reproduce the issue in the broken code, it's important that this is an async generator
// and it emits some chunks larger than the default highWaterMark of 16 KB.
async function* gen(i = 10) {
while (i < 25) {
yield Buffer.from(new Array(i++ * 1024).fill());
}
}

const input = Readable.from(gen());
const av = clamscan.passthrough();

// The failure case will throw an error and not finish
input.pipe(av).on('end', done).resume();
});

if (!process.env.CI) {
it('should handle a 0-byte file', () => {
const input = fs.createReadStream(emptyFile);
Expand Down