diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index f30194d78..2d612fdd1 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -1,6 +1,7 @@ // Copyright 2020-2022 OnFinality Limited authors & contributors // SPDX-License-Identifier: Apache-2.0 +import { Block } from '@cosmjs/tendermint-rpc'; import { Inject, Injectable } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { hexToU8a, u8aEq } from '@polkadot/util'; @@ -22,6 +23,7 @@ import { CosmosMessage, CosmosTransaction, } from '@subql/types-cosmos'; +import { conformsTo } from 'lodash'; import { Sequelize } from 'sequelize'; import { NodeConfig } from '../configure/NodeConfig'; import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject'; @@ -215,27 +217,21 @@ export class IndexerManager { } private async indexBlockData( - { block, events, messages, transactions }: BlockContent, + blockContent: BlockContent, dataSources: SubqlProjectDs[], getVM: (d: SubqlProjectDs) => IndexerSandbox, ): Promise { - await this.indexBlockContent(block, dataSources, getVM); + await this.indexBlockContent(blockContent, dataSources, getVM); - for (const tx of transactions) { - await this.indexTransaction(tx, dataSources, getVM); - } + await this.indexTransaction(blockContent, dataSources, getVM); - for (const msg of messages) { - await this.indexMessage(msg, dataSources, getVM); - } + await this.indexMessage(blockContent, dataSources, getVM); - for (const evt of events) { - await this.indexEvent(evt, dataSources, getVM); - } + await this.indexEvent(blockContent, dataSources, getVM); } private async indexBlockContent( - block: CosmosBlock, + block: BlockContent, dataSources: SubqlProjectDs[], getVM: (d: SubqlProjectDs) => IndexerSandbox, ): Promise { @@ -245,14 +241,14 @@ export class IndexerManager { } private async indexTransaction( - transaction: CosmosTransaction, + block: BlockContent, dataSources: SubqlProjectDs[], getVM: (d: SubqlProjectDs) => IndexerSandbox, ): Promise { for (const ds of dataSources) { await this.indexData( SubqlCosmosHandlerKind.Transaction, - transaction, + block, ds, getVM(ds), ); @@ -260,14 +256,14 @@ export class IndexerManager { } private async indexMessage( - message: CosmosMessage, + block: BlockContent, dataSources: SubqlProjectDs[], getVM: (d: SubqlProjectDs) => IndexerSandbox, ): Promise { for (const ds of dataSources) { await this.indexData( SubqlCosmosHandlerKind.Message, - message, + block, ds, getVM(ds), ); @@ -275,54 +271,65 @@ export class IndexerManager { } private async indexEvent( - event: CosmosEvent, + block: BlockContent, dataSources: SubqlProjectDs[], getVM: (d: SubqlProjectDs) => IndexerSandbox, ): Promise { for (const ds of dataSources) { - await this.indexData(SubqlCosmosHandlerKind.Event, event, ds, getVM(ds)); + await this.indexData(SubqlCosmosHandlerKind.Event, block, ds, getVM(ds)); } } private async indexData( kind: K, - data: CosmosRuntimeHandlerInputMap[K], + //data: CosmosRuntimeHandlerInputMap[K], + block: BlockContent, ds: SubqlProjectDs, vm: IndexerSandbox, ): Promise { if (isRuntimeCosmosDs(ds)) { const handlers = ds.mapping.handlers.filter( - (h) => h.kind === kind && FilterTypeMap[kind](data as any, h.filter), + (h) => h.kind === kind, //&& FilterTypeMap[kind](data as any, h.filter), ); - for (const handler of handlers) { - await vm.securedExec(handler.handler, [data]); + const blockData = BlockContentTypeMap[kind](block); + + for (const data of blockData) { + const filteredHandlers = handlers.filter((h) => + FilterTypeMap[kind](data as any, h.filter), + ); + for (const handler of filteredHandlers) { + await vm.securedExec(handler.handler, [data]); + } } } else if (isCustomCosmosDs(ds)) { - const handlers = this.filterCustomDsHandlers( - ds, - data, - ProcessorTypeMap[kind], - (data, baseFilter) => { - switch (kind) { - case SubqlCosmosHandlerKind.Message: - return !!CosmosUtil.filterMessages( - [data as CosmosMessage], - baseFilter, - ).length; - case SubqlCosmosHandlerKind.Event: - return !!CosmosUtil.filterEvents( - [data as CosmosEvent], - baseFilter, - ).length; - default: - throw new Error('Unsuported handler kind'); - } - }, - ); + const blockData = BlockContentTypeMap[kind](block); + for (const data of blockData) { + const handlers = this.filterCustomDsHandlers( + ds, + data as CosmosRuntimeHandlerInputMap[K], + ProcessorTypeMap[kind], + (data, baseFilter) => { + switch (kind) { + case SubqlCosmosHandlerKind.Message: + return !!CosmosUtil.filterMessages( + [data as CosmosMessage], + baseFilter, + ).length; + case SubqlCosmosHandlerKind.Event: + return !!CosmosUtil.filterEvents( + [data as CosmosEvent], + baseFilter, + ).length; + default: + throw new Error('Unsuported handler kind'); + } + }, + ); - for (const handler of handlers) { - await this.transformAndExecuteCustomDs(ds, vm, handler, data); + for (const handler of handlers) { + await this.transformAndExecuteCustomDs(ds, vm, handler, data); + } } } } @@ -399,3 +406,11 @@ const FilterTypeMap = { [SubqlCosmosHandlerKind.Event]: CosmosUtil.filterEvent, [SubqlCosmosHandlerKind.Message]: CosmosUtil.filterMessageData, }; + +const BlockContentTypeMap = { + [SubqlCosmosHandlerKind.Block]: (block: BlockContent) => [block.block], + [SubqlCosmosHandlerKind.Transaction]: (block: BlockContent) => + block.transactions, + [SubqlCosmosHandlerKind.Message]: (block: BlockContent) => block.messages, + [SubqlCosmosHandlerKind.Event]: (block: BlockContent) => block.events, +}; diff --git a/packages/node/src/utils/cosmos.spec.ts b/packages/node/src/utils/cosmos.spec.ts index f981b3125..9b9f31bd9 100644 --- a/packages/node/src/utils/cosmos.spec.ts +++ b/packages/node/src/utils/cosmos.spec.ts @@ -12,11 +12,9 @@ import { defaultRegistryTypes } from '@cosmjs/stargate'; import { Tendermint34Client } from '@cosmjs/tendermint-rpc'; import { SubqlCosmosMessageFilter, - SubqlCosmosEventFilter, CosmosBlock, CosmosTransaction, CosmosMessage, - CosmosEvent, } from '@subql/types-cosmos'; import { MsgClearAdmin, @@ -27,7 +25,8 @@ import { MsgUpdateAdmin, } from 'cosmjs-types/cosmwasm/wasm/v1/tx'; import { CosmosClient } from '../indexer/api.service'; -import { filterMessageData, filterEvent, wrapBlock, wrapEvent } from './cosmos'; +import { filterMessageData, wrapEvent } from './cosmos'; +import * as CosmosUtil from './cosmos'; const ENDPOINT = 'https://rpc.juno-1.api.onfinality.io'; const CHAINID = 'juno-1'; @@ -69,9 +68,23 @@ const TEST_NESTED_MESSAGE_FILTER_FALSE: SubqlCosmosMessageFilter = { }, }; +const TEST_NESTED_MESSAGE_FILTER_INVALID_PATH: SubqlCosmosMessageFilter = { + type: '/cosmwasm.wasm.v1.MsgExecuteContract', + contractCall: 'swap', + values: { + 'msg.swap.input_token.xxx': 'Token2', + }, +}; + +const TEST_MESSAGE_FILTER_FALSE_2: SubqlCosmosMessageFilter = { + type: '/cosmwasm.wasm.v1.MsgStoreCode', +}; + describe('CosmosUtils', () => { let api: CosmosClient; - let decodedTx; + let decodedTx: DecodedTxRaw; + let msg: CosmosMessage; + beforeAll(async () => { const client = await CosmWasmClient.connect(ENDPOINT); const tendermint = await Tendermint34Client.connect(ENDPOINT); @@ -93,68 +106,47 @@ describe('CosmosUtils', () => { '1A796F30DD866CA2E9A866084CB10BF13B5F6502256D6503E8B1BAC358B15701', ); decodedTx = decodeTxRaw(txInfo.tx); - }); - - it('filter message data for true', () => { - const decodedMsg = api.decodeMsg(decodedTx.body.messages[0]); - const msg: CosmosMessage = { + msg = { idx: 0, block: {} as CosmosBlock, tx: {} as CosmosTransaction, msg: { typeUrl: decodedTx.body.messages[0].typeUrl, - ...decodedMsg, + get decodedMsg() { + return api.decodeMsg(decodedTx.body.messages[0]); + }, }, }; + }); + + it('filter message data for true', () => { const result = filterMessageData(msg, TEST_MESSAGE_FILTER_TRUE); expect(result).toEqual(true); }); it('filter message data for false', () => { - const decodedMsg = api.decodeMsg(decodedTx.body.messages[0]); - const msg: CosmosMessage = { - idx: 0, - block: {} as CosmosBlock, - tx: {} as CosmosTransaction, - msg: { - typeUrl: decodedTx.body.messages[0].typeUrl, - ...decodedMsg, - }, - }; const result = filterMessageData(msg, TEST_MESSAGE_FILTER_FALSE); expect(result).toEqual(false); }); it('filter nested message data for true', () => { - const decodedMsg = api.decodeMsg(decodedTx.body.messages[0]); - const msg: CosmosMessage = { - idx: 0, - block: {} as CosmosBlock, - tx: {} as CosmosTransaction, - msg: { - typeUrl: decodedTx.body.messages[0].typeUrl, - ...decodedMsg, - }, - }; const result = filterMessageData(msg, TEST_NESTED_MESSAGE_FILTER_TRUE); expect(result).toEqual(true); }); it('filter nested message data for false', () => { - const decodedMsg = api.decodeMsg(decodedTx.body.messages[0]); - const msg: CosmosMessage = { - idx: 0, - block: {} as CosmosBlock, - tx: {} as CosmosTransaction, - msg: { - typeUrl: decodedTx.body.messages[0].typeUrl, - ...decodedMsg, - }, - }; const result = filterMessageData(msg, TEST_NESTED_MESSAGE_FILTER_FALSE); expect(result).toEqual(false); }); + it('filter nested message data for invalid path', () => { + const result = filterMessageData( + msg, + TEST_NESTED_MESSAGE_FILTER_INVALID_PATH, + ); + expect(result).toEqual(false); + }); + it('does not wrap events of failed transaction', async () => { const blockInfo = await api.blockResults(TEST_FAILTX_BLOCKNUMBER); const failedTx = blockInfo.results[2]; @@ -168,4 +160,16 @@ describe('CosmosUtils', () => { const events = wrapEvent({} as CosmosBlock, [tx], api); expect(events.length).toEqual(0); }); + + it('does not lazy decode failed message filters', () => { + const spy = jest.spyOn(msg.msg, 'decodedMsg', 'get'); + const result = filterMessageData(msg, TEST_MESSAGE_FILTER_FALSE_2); + expect(spy).not.toHaveBeenCalled(); + }); + + it('lazy decode passed message filters', () => { + const spy = jest.spyOn(msg.msg, 'decodedMsg', 'get'); + const result = filterMessageData(msg, TEST_MESSAGE_FILTER_TRUE); + expect(spy).toHaveBeenCalled(); + }); }); diff --git a/packages/node/src/utils/cosmos.ts b/packages/node/src/utils/cosmos.ts index e94687aa6..9098507b1 100644 --- a/packages/node/src/utils/cosmos.ts +++ b/packages/node/src/utils/cosmos.ts @@ -8,6 +8,7 @@ import { decodeTxRaw } from '@cosmjs/proto-signing'; import { Block } from '@cosmjs/stargate'; import { Log, parseRawLog } from '@cosmjs/stargate/build/logs'; import { BlockResultsResponse, TxData } from '@cosmjs/tendermint-rpc'; +import { isRuntimeCosmosDs } from '@subql/common-cosmos'; import { SubqlCosmosEventFilter, SubqlCosmosMessageFilter, @@ -15,7 +16,10 @@ import { CosmosEvent, CosmosTransaction, CosmosMessage, + SubqlCosmosHandlerKind, } from '@subql/types-cosmos'; +import { transpileModule } from 'typescript'; +import { SubqlProjectDs } from '../configure/SubqueryProject'; import { CosmosClient } from '../indexer/api.service'; import { BlockContent } from '../indexer/types'; import { getLogger } from './logger'; @@ -34,7 +38,7 @@ export function filterMessageData( for (const key in filter.values) { if ( filter.values[key] !== - key.split('.').reduce((acc, curr) => acc[curr], data.msg) + key.split('.').reduce((acc, curr) => acc[curr], data.msg.decodedMsg) ) { return false; } @@ -44,8 +48,8 @@ export function filterMessageData( filter.type === '/cosmwasm.wasm.v1.MsgExecuteContract' && filter.contractCall && !( - filter.contractCall === data.msg.msg || - filter.contractCall in data.msg.msg + filter.contractCall === data.msg.decodedMsg.msg || + filter.contractCall in data.msg.decodedMsg.msg ) ) { return false; @@ -158,7 +162,10 @@ export function wrapTx( block: block, tx, hash: toHex(sha256(block.block.txs[idx])).toUpperCase(), - decodedTx: decodeTxRaw(block.block.txs[idx]), + get decodedTx() { + delete (this as any).decodedTx; + return ((this.decodedTx as any) = decodeTxRaw(block.block.txs[idx])); + }, })); } @@ -175,7 +182,10 @@ function wrapCosmosMsg( block: block, msg: { typeUrl: rawMessage.typeUrl, - ...api.decodeMsg(rawMessage), + get decodedMsg() { + delete this.decodedMsg; + return (this.decodedMsg = api.decodeMsg(rawMessage)); + }, }, }; } @@ -242,16 +252,47 @@ export async function fetchBlocksBatches( // Make non-readonly const results = [...blockResults.results]; - const block = wrapBlock(blockInfo, results); - const transactions = wrapTx(block, results); - const messages = wrapMsg(block, transactions, api); - const events = wrapEvent(block, transactions, api); - - return { - block, - transactions, - messages, - events, - }; + return new LazyBlockContent(blockInfo, results, api); }); } + +class LazyBlockContent implements BlockContent { + private _wrappedBlock: CosmosBlock; + private _wrappedTransaction: CosmosTransaction[]; + private _wrappedMessage: CosmosMessage[]; + private _wrappedEvent: CosmosEvent[]; + + constructor( + private _blockInfo: Block, + private _results: TxData[], + private _api: CosmosClient, + ) {} + + get block() { + if (!this._wrappedBlock) { + this._wrappedBlock = wrapBlock(this._blockInfo, this._results); + } + return this._wrappedBlock; + } + + get transactions() { + if (!this._wrappedTransaction) { + this._wrappedTransaction = wrapTx(this.block, this._results); + } + return this._wrappedTransaction; + } + + get messages() { + if (!this._wrappedMessage) { + this._wrappedMessage = wrapMsg(this.block, this.transactions, this._api); + } + return this._wrappedMessage; + } + + get events() { + if (!this._wrappedEvent) { + this._wrappedEvent = wrapEvent(this.block, this.transactions, this._api); + } + return this._wrappedEvent; + } +} diff --git a/packages/types/src/interfaces.ts b/packages/types/src/interfaces.ts index 3f3e5aa73..34c35488e 100644 --- a/packages/types/src/interfaces.ts +++ b/packages/types/src/interfaces.ts @@ -35,11 +35,14 @@ export interface CosmosTransaction { decodedTx: DecodedTxRaw; } -export interface CosmosMessage { +export interface CosmosMessage { idx: number; block: CosmosBlock; tx: CosmosTransaction; - msg: any; + msg: { + typeUrl: string; + decodedMsg: T; + }; } export interface CosmosEvent {