Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

fix: waitForTransactionToBeProvable not waiting for blocks #338

Merged
merged 27 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2bcc1c9
fix: blockchainListener.waitForBlocks not waiting for blocks properly
antouhou Feb 8, 2021
7cb82d2
fix handler instantiation
antouhou Feb 8, 2021
314232f
fix: tests
Feb 9, 2021
6ceca4b
fix waiting for transaction to be provable
antouhou Feb 9, 2021
ea8abba
Merge remote-tracking branch 'origin/fix-block-wait' into fix-block-wait
antouhou Feb 9, 2021
d09d3be
fix tests
antouhou Feb 9, 2021
1a9e04a
simplified? promise a bit
antouhou Feb 9, 2021
fbb53fb
fix tests
antouhou Feb 9, 2021
af0a7a6
increased test coverage threshold
antouhou Feb 9, 2021
55001b2
kek
antouhou Feb 16, 2021
53f5dc2
remove promise handler factory, as there's no more generic handlers left
antouhou Feb 16, 2021
8408b38
fix race promise
antouhou Feb 16, 2021
3ce86cd
refactor: BlockchainListener
shumkov Feb 16, 2021
e22be10
Remove console.logs
antouhou Feb 16, 2021
8ff0aa8
refactor: wait for state transition result
shumkov Feb 16, 2021
4be94d1
Merge remote-tracking branch 'origin/fix-block-wait' into fix-block-wait
shumkov Feb 16, 2021
1a59f38
remove old tests
antouhou Feb 16, 2021
7bf04fe
remove unused variables
antouhou Feb 16, 2021
6113024
add new tests for BlockchainListener.js
antouhou Feb 16, 2021
73ee3db
add test for empty block
antouhou Feb 16, 2021
b22c897
test: implement tests
shumkov Feb 16, 2021
68d082b
Merge remote-tracking branch 'origin/fix-block-wait' into fix-block-wait
shumkov Feb 16, 2021
6110724
style: fix linter
shumkov Feb 16, 2021
f1b4078
useless commit
antouhou Feb 17, 2021
f6e0fdc
Merge remote-tracking branch 'origin/fix-block-wait' into fix-block-wait
antouhou Feb 17, 2021
d92b71a
revert useless commit
antouhou Feb 17, 2021
cd6dcdd
fix waitForStateTransitionResultHandlerFactory.js integration test
antouhou Feb 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions lib/externalApis/tenderdash/BlockchainListener.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const crypto = require('crypto');

const EventEmitter = require('events');
const TransactionWaitPeriodExceededError = require('../../errors/TransactionWaitPeriodExceededError');

Expand Down Expand Up @@ -26,6 +28,15 @@ class BlockchainListener extends EventEmitter {
return `transaction:${transactionHashString}`;
}

/**
*
* @param transactionHashString
* @return {string}
*/
static getTransactionAddedToTheBlockEventName(transactionHashString) {
return `blockTransactionAdded:${transactionHashString}`;
}

/**
* Subscribe to transactions and attach transaction event handler
*/
Expand All @@ -36,6 +47,7 @@ class BlockchainListener extends EventEmitter {
this.wsClient.subscribe(NEW_BLOCK_QUERY);
this.wsClient.on(NEW_BLOCK_QUERY, (message) => {
this.emit(events.NEW_BLOCK, message);
this.emitTransactionHashesInBlock(message.data.value.block.data.txs);
});
}

Expand All @@ -55,6 +67,16 @@ class BlockchainListener extends EventEmitter {
return handler;
}

emitTransactionHashesInBlock(serializedTransactions) {
serializedTransactions.forEach((base64tx) => {
const hashString = crypto.createHash('sha256')
.update(Buffer.from(base64tx, 'base64'))
.digest()
.toString('hex');
this.emit(BlockchainListener.getTransactionAddedToTheBlockEventName(hashString));
});
}

/**
* Emits transaction:%tx_hash% if there's a transaction in the message
* @private
Expand All @@ -76,37 +98,56 @@ class BlockchainListener extends EventEmitter {
* @param {number} [timeout] - timeout to reject after
* @return {Promise<Object>}
*/
waitForTransaction(hashString, timeout = 60000) {
waitForTransactionToBeProvable(hashString, timeout = 60000) {
const topic = BlockchainListener.getTransactionEventName(hashString);
let handler;
const txInBlockTopic = BlockchainListener
.getTransactionAddedToTheBlockEventName(hashString.toLowerCase());


let txHandler;
const txPromise = new Promise(((resolve) => {
txHandler = this.createPromiseHandler(topic, resolve);
shumkov marked this conversation as resolved.
Show resolved Hide resolved
this.on(topic, txHandler);
}));

let txInBlockHandler;
let newBlockHandler;
const txInBlockPromise = new Promise(((resolve) => {
let seenTransaction = false;
txInBlockHandler = () => {
seenTransaction = true;
};
newBlockHandler = () => {
if (seenTransaction) {
// Note that this will resolve only after two blocks. That is because the first block will
// flip the 'seenTransaction' toggle to true, and transaction will become provable
// only on the next block after the block it was included into
this.off(events.NEW_BLOCK, newBlockHandler);
resolve();
}
};
this.once(txInBlockTopic, txInBlockHandler);
this.on(events.NEW_BLOCK, newBlockHandler);
}));

return Promise.race([
new Promise((resolve) => {
handler = this.createPromiseHandler(topic, resolve);
this.on(topic, handler);
new Promise(async (resolve) => {
Promise.all([
txPromise,
txInBlockPromise,
]).then(([data]) => resolve(data));
}),
new Promise((resolve, reject) => {
setTimeout(() => {
this.off(topic, handler);
// Detaching old handlers
this.off(topic, txHandler);
this.off(txInBlockTopic, txInBlockHandler);
this.off(events.NEW_BLOCK, newBlockHandler);
reject(new TransactionWaitPeriodExceededError(hashString));
}, timeout);
}),
]);
}

waitForNextBlock() {
return new Promise((resolve) => {
this.once(events.NEW_BLOCK, resolve);
});
}

async waitForBlocks(countToWait = 1) {
let blocksSeen = 0;
while (blocksSeen !== countToWait) {
await this.waitForNextBlock();
blocksSeen += 1;
}
}
}

BlockchainListener.TX_QUERY = TX_QUERY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ function waitForStateTransitionResultHandlerFactory(
let data;

try {
data = await blockchainListener.waitForTransaction(hashString, stateTransitionWaitTimeout);
data = await blockchainListener
.waitForTransactionToBeProvable(hashString, stateTransitionWaitTimeout);
} catch (e) {
if (e instanceof TransactionWaitPeriodExceededError) {
throw new DeadlineExceededGrpcError(
Expand Down Expand Up @@ -114,10 +115,6 @@ function waitForStateTransitionResultHandlerFactory(
return response;
}

// The first block is the one with the current ST, second block is the
// block in which st result will be included in the proof
await blockchainListener.waitForBlocks(2);

if (prove) {
const stateTransition = await dpp.stateTransition.createFromBuffer(
Buffer.from(tx, 'base64'),
Expand Down
126 changes: 70 additions & 56 deletions test/integration/externalApis/tenderdash/BlockchainListener.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const EventEmitter = require('events');
const crypto = require('crypto');
const BlockchainListener = require('../../../../lib/externalApis/tenderdash/BlockchainListener');
const TransactionWaitPeriodExceededError = require('../../../../lib/errors/TransactionWaitPeriodExceededError');

Expand All @@ -7,6 +8,10 @@ describe('BlockchainListener', () => {
let wsClientMock;
let blockchainListener;
let txDataMock;
const base64tx = 'aaaa';
let txHash;
let blockWithTxMock;
let emptyBlockMock;

beforeEach(function beforeEach() {
({ sinon } = this);
Expand All @@ -15,12 +20,24 @@ describe('BlockchainListener', () => {
blockchainListener = new BlockchainListener(wsClientMock);
blockchainListener.start();

txHash = crypto.createHash('sha256')
.update(Buffer.from(base64tx, 'base64'))
.digest()
.toString('hex');

txDataMock = {
events: {
'tx.hash': ['123'],
'tx.hash': [txHash],
},
};

blockWithTxMock = {
data: { value: { block: { data: { txs: [base64tx] } } } },
};
emptyBlockMock = {
data: { value: { block: { data: { txs: [] } } } },
};

sinon.spy(blockchainListener, 'on');
sinon.spy(blockchainListener, 'off');
sinon.spy(blockchainListener, 'emit');
Expand All @@ -40,47 +57,58 @@ describe('BlockchainListener', () => {

describe('.getTransactionEventName', () => {
it('should return transaction event name', () => {
const eventName = BlockchainListener.getTransactionEventName('123');
expect(eventName).to.be.equal('transaction:123');
const eventName = BlockchainListener.getTransactionEventName(txHash);
expect(eventName).to.be.equal(`transaction:${txHash}`);
});
});

describe('#waitForTransaction', () => {
it('should remove listener after transaction resolves', async () => {
const eventName = BlockchainListener.getTransactionEventName('123');
const txPromise = blockchainListener.waitForTransaction('123', 2000);
const txPromise = blockchainListener.waitForTransactionToBeProvable(txHash, 2000);

expect(blockchainListener.listenerCount(eventName)).to.be.equal(1);
// check that we attached events correctly
const events = blockchainListener.eventNames();
// transaction result, transaction in block an block events
expect(events.length).to.be.equal(3);
events.forEach((eventName) => {
expect(blockchainListener.listenerCount(eventName)).to.be.equal(1);
});

setTimeout(() => {
wsClientMock.emit(BlockchainListener.TX_QUERY, Object.assign({}, txDataMock));
wsClientMock.emit(BlockchainListener.NEW_BLOCK_QUERY, Object.assign({}, blockWithTxMock));
wsClientMock.emit(BlockchainListener.NEW_BLOCK_QUERY, Object.assign({}, emptyBlockMock));
}, 10);

const txData = await txPromise;

// Check that event listener was properly attached
expect(blockchainListener.on).to.be.calledOnce();
// Check that transaction data was emitted
expect(blockchainListener.emit).to.be.calledOnce();
expect(blockchainListener.on).to.be.calledThrice();
// Check that the event listener was properly removed
expect(blockchainListener.off).to.be.calledOnce();
expect(blockchainListener.listenerCount(eventName)).to.be.equal(0);
expect(blockchainListener.off).to.be.calledTwice();
events.forEach((eventName) => {
expect(blockchainListener.listenerCount(eventName)).to.be.equal(0);
});

expect(txData).to.be.deep.equal(txDataMock);
});

it('should not emit transaction event if event data has no transaction', async () => {
const eventName = BlockchainListener.getTransactionEventName('123');
txDataMock = {};

let events;
let error;
try {
const txPromise = blockchainListener.waitForTransaction('123', 1000);
const txPromise = blockchainListener.waitForTransactionToBeProvable(txHash, 1000);

expect(blockchainListener.listenerCount(eventName)).to.be.equal(1);
// check that we attached events correctly
events = blockchainListener.eventNames();
// transaction result, transaction in block an block events
expect(events.length).to.be.equal(3);
events.forEach((eventName) => {
expect(blockchainListener.listenerCount(eventName)).to.be.equal(1);
});

setTimeout(() => {
wsClientMock.emit(BlockchainListener.TX_QUERY, Object.assign({}, txDataMock));
wsClientMock.emit(BlockchainListener.NEW_BLOCK_QUERY, Object.assign({}, blockWithTxMock));
}, 10);

await txPromise;
Expand All @@ -90,64 +118,50 @@ describe('BlockchainListener', () => {

// Check that the error is correct
expect(error).to.be.instanceOf(TransactionWaitPeriodExceededError);
expect(error.message).to.be.equal('Transaction waiting period for 123 exceeded');
expect(error.getTransactionHash()).to.be.equal('123');
expect(error.message).to.be.equal(`Transaction waiting period for ${txHash} exceeded`);
expect(error.getTransactionHash()).to.be.equal(txHash);

// Check that event listener was properly attached
expect(blockchainListener.on).to.be.calledOnce();
// Check that event listener was properly removed
expect(blockchainListener.off).to.be.calledOnce();
expect(blockchainListener.listenerCount(eventName)).to.be.equal(0);
expect(blockchainListener.on).to.be.calledThrice();
events.forEach((eventName) => {
expect(blockchainListener.listenerCount(eventName)).to.be.equal(0);
});
// Check that no transaction data was emitted
expect(blockchainListener.emit).to.not.be.called();
expect(blockchainListener.emit).to.be.calledTwice();
});

it('should remove listener after timeout has been exceeded', async () => {
const eventName = BlockchainListener.getTransactionEventName('123');
let events;
let error;
try {
await blockchainListener.waitForTransaction('123', 100);
const txPromise = blockchainListener.waitForTransactionToBeProvable(txHash, 100);

events = blockchainListener.eventNames();
expect(events.length).to.be.equal(3);
events.forEach((eventName) => {
expect(blockchainListener.listenerCount(eventName)).to.be.equal(1);
});

await txPromise;
} catch (e) {
error = e;
}

// Check that the error is correct
expect(error).to.be.instanceOf(TransactionWaitPeriodExceededError);
expect(error.message).to.be.equal('Transaction waiting period for 123 exceeded');
expect(error.getTransactionHash()).to.be.equal('123');
expect(error.message).to.be.equal(`Transaction waiting period for ${txHash} exceeded`);
expect(error.getTransactionHash()).to.be.equal(txHash);

// Check that event listener was properly attached
expect(blockchainListener.on).to.be.calledOnce();
expect(blockchainListener.on).to.be.calledThrice();
// Check that event listener was properly removed
expect(blockchainListener.off).to.be.calledOnce();
expect(blockchainListener.listenerCount(eventName)).to.be.equal(0);
expect(blockchainListener.off).to.be.calledThrice();

events.forEach((eventName) => {
expect(blockchainListener.listenerCount(eventName)).to.be.equal(0);
});
// Check that no transaction data was emitted
expect(blockchainListener.emit).to.not.be.called();
});
});

describe('#waitForBlocks', () => {
it('should wait for n blocks and remove listeners afterwards', async () => {
const newBlockEvent = BlockchainListener.events.NEW_BLOCK;
const blockPromise = blockchainListener.waitForBlocks(2);

expect(blockchainListener.listenerCount(newBlockEvent)).to.be.equal(1);

setTimeout(() => {
wsClientMock.emit(BlockchainListener.NEW_BLOCK_QUERY, Object.assign({}, txDataMock));
}, 10);
setTimeout(() => {
wsClientMock.emit(BlockchainListener.NEW_BLOCK_QUERY, Object.assign({}, txDataMock));
}, 10);

await blockPromise;

// Check that event listener was properly attached
expect(blockchainListener.on).to.be.calledTwice();
// Check that transaction data was emitted
expect(blockchainListener.emit).to.be.calledTwice();
// Check that the event listener was properly removed
expect(blockchainListener.listenerCount(newBlockEvent)).to.be.equal(0);
});
});
});