Skip to content

Commit

Permalink
Merge pull request #17 from subquery/http-keep-alive
Browse files Browse the repository at this point in the history
lazy decode messages
  • Loading branch information
stwiname authored Jun 27, 2022
2 parents 52ac9f3 + baaa237 commit ecb6c78
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 103 deletions.
105 changes: 60 additions & 45 deletions packages/node/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Block } from '@cosmjs/tendermint-rpc';
import { Inject, Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { hexToU8a, u8aEq } from '@polkadot/util';
Expand All @@ -22,6 +23,7 @@ import {
CosmosMessage,
CosmosTransaction,
} from '@subql/types-cosmos';
import { conformsTo } from 'lodash';
import { Sequelize } from 'sequelize';
import { NodeConfig } from '../configure/NodeConfig';
import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject';
Expand Down Expand Up @@ -215,27 +217,21 @@ export class IndexerManager {
}

private async indexBlockData(
{ block, events, messages, transactions }: BlockContent,
blockContent: BlockContent,
dataSources: SubqlProjectDs[],
getVM: (d: SubqlProjectDs) => IndexerSandbox,
): Promise<void> {
await this.indexBlockContent(block, dataSources, getVM);
await this.indexBlockContent(blockContent, dataSources, getVM);

for (const tx of transactions) {
await this.indexTransaction(tx, dataSources, getVM);
}
await this.indexTransaction(blockContent, dataSources, getVM);

for (const msg of messages) {
await this.indexMessage(msg, dataSources, getVM);
}
await this.indexMessage(blockContent, dataSources, getVM);

for (const evt of events) {
await this.indexEvent(evt, dataSources, getVM);
}
await this.indexEvent(blockContent, dataSources, getVM);
}

private async indexBlockContent(
block: CosmosBlock,
block: BlockContent,
dataSources: SubqlProjectDs[],
getVM: (d: SubqlProjectDs) => IndexerSandbox,
): Promise<void> {
Expand All @@ -245,84 +241,95 @@ export class IndexerManager {
}

private async indexTransaction(
transaction: CosmosTransaction,
block: BlockContent,
dataSources: SubqlProjectDs[],
getVM: (d: SubqlProjectDs) => IndexerSandbox,
): Promise<void> {
for (const ds of dataSources) {
await this.indexData(
SubqlCosmosHandlerKind.Transaction,
transaction,
block,
ds,
getVM(ds),
);
}
}

private async indexMessage(
message: CosmosMessage,
block: BlockContent,
dataSources: SubqlProjectDs[],
getVM: (d: SubqlProjectDs) => IndexerSandbox,
): Promise<void> {
for (const ds of dataSources) {
await this.indexData(
SubqlCosmosHandlerKind.Message,
message,
block,
ds,
getVM(ds),
);
}
}

private async indexEvent(
event: CosmosEvent,
block: BlockContent,
dataSources: SubqlProjectDs[],
getVM: (d: SubqlProjectDs) => IndexerSandbox,
): Promise<void> {
for (const ds of dataSources) {
await this.indexData(SubqlCosmosHandlerKind.Event, event, ds, getVM(ds));
await this.indexData(SubqlCosmosHandlerKind.Event, block, ds, getVM(ds));
}
}

private async indexData<K extends SubqlCosmosHandlerKind>(
kind: K,
data: CosmosRuntimeHandlerInputMap[K],
//data: CosmosRuntimeHandlerInputMap[K],
block: BlockContent,
ds: SubqlProjectDs,
vm: IndexerSandbox,
): Promise<void> {
if (isRuntimeCosmosDs(ds)) {
const handlers = ds.mapping.handlers.filter(
(h) => h.kind === kind && FilterTypeMap[kind](data as any, h.filter),
(h) => h.kind === kind, //&& FilterTypeMap[kind](data as any, h.filter),
);

for (const handler of handlers) {
await vm.securedExec(handler.handler, [data]);
const blockData = BlockContentTypeMap[kind](block);

for (const data of blockData) {
const filteredHandlers = handlers.filter((h) =>
FilterTypeMap[kind](data as any, h.filter),
);
for (const handler of filteredHandlers) {
await vm.securedExec(handler.handler, [data]);
}
}
} else if (isCustomCosmosDs(ds)) {
const handlers = this.filterCustomDsHandlers<K>(
ds,
data,
ProcessorTypeMap[kind],
(data, baseFilter) => {
switch (kind) {
case SubqlCosmosHandlerKind.Message:
return !!CosmosUtil.filterMessages(
[data as CosmosMessage],
baseFilter,
).length;
case SubqlCosmosHandlerKind.Event:
return !!CosmosUtil.filterEvents(
[data as CosmosEvent],
baseFilter,
).length;
default:
throw new Error('Unsuported handler kind');
}
},
);
const blockData = BlockContentTypeMap[kind](block);
for (const data of blockData) {
const handlers = this.filterCustomDsHandlers<K>(
ds,
data as CosmosRuntimeHandlerInputMap[K],
ProcessorTypeMap[kind],
(data, baseFilter) => {
switch (kind) {
case SubqlCosmosHandlerKind.Message:
return !!CosmosUtil.filterMessages(
[data as CosmosMessage],
baseFilter,
).length;
case SubqlCosmosHandlerKind.Event:
return !!CosmosUtil.filterEvents(
[data as CosmosEvent],
baseFilter,
).length;
default:
throw new Error('Unsuported handler kind');
}
},
);

for (const handler of handlers) {
await this.transformAndExecuteCustomDs(ds, vm, handler, data);
for (const handler of handlers) {
await this.transformAndExecuteCustomDs(ds, vm, handler, data);
}
}
}
}
Expand Down Expand Up @@ -399,3 +406,11 @@ const FilterTypeMap = {
[SubqlCosmosHandlerKind.Event]: CosmosUtil.filterEvent,
[SubqlCosmosHandlerKind.Message]: CosmosUtil.filterMessageData,
};

const BlockContentTypeMap = {
[SubqlCosmosHandlerKind.Block]: (block: BlockContent) => [block.block],
[SubqlCosmosHandlerKind.Transaction]: (block: BlockContent) =>
block.transactions,
[SubqlCosmosHandlerKind.Message]: (block: BlockContent) => block.messages,
[SubqlCosmosHandlerKind.Event]: (block: BlockContent) => block.events,
};
84 changes: 44 additions & 40 deletions packages/node/src/utils/cosmos.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import { defaultRegistryTypes } from '@cosmjs/stargate';
import { Tendermint34Client } from '@cosmjs/tendermint-rpc';
import {
SubqlCosmosMessageFilter,
SubqlCosmosEventFilter,
CosmosBlock,
CosmosTransaction,
CosmosMessage,
CosmosEvent,
} from '@subql/types-cosmos';
import {
MsgClearAdmin,
Expand All @@ -27,7 +25,8 @@ import {
MsgUpdateAdmin,
} from 'cosmjs-types/cosmwasm/wasm/v1/tx';
import { CosmosClient } from '../indexer/api.service';
import { filterMessageData, filterEvent, wrapBlock, wrapEvent } from './cosmos';
import { filterMessageData, wrapEvent } from './cosmos';
import * as CosmosUtil from './cosmos';

const ENDPOINT = 'https://rpc.juno-1.api.onfinality.io';
const CHAINID = 'juno-1';
Expand Down Expand Up @@ -69,9 +68,23 @@ const TEST_NESTED_MESSAGE_FILTER_FALSE: SubqlCosmosMessageFilter = {
},
};

const TEST_NESTED_MESSAGE_FILTER_INVALID_PATH: SubqlCosmosMessageFilter = {
type: '/cosmwasm.wasm.v1.MsgExecuteContract',
contractCall: 'swap',
values: {
'msg.swap.input_token.xxx': 'Token2',
},
};

const TEST_MESSAGE_FILTER_FALSE_2: SubqlCosmosMessageFilter = {
type: '/cosmwasm.wasm.v1.MsgStoreCode',
};

describe('CosmosUtils', () => {
let api: CosmosClient;
let decodedTx;
let decodedTx: DecodedTxRaw;
let msg: CosmosMessage;

beforeAll(async () => {
const client = await CosmWasmClient.connect(ENDPOINT);
const tendermint = await Tendermint34Client.connect(ENDPOINT);
Expand All @@ -93,68 +106,47 @@ describe('CosmosUtils', () => {
'1A796F30DD866CA2E9A866084CB10BF13B5F6502256D6503E8B1BAC358B15701',
);
decodedTx = decodeTxRaw(txInfo.tx);
});

it('filter message data for true', () => {
const decodedMsg = api.decodeMsg<any>(decodedTx.body.messages[0]);
const msg: CosmosMessage = {
msg = {
idx: 0,
block: {} as CosmosBlock,
tx: {} as CosmosTransaction,
msg: {
typeUrl: decodedTx.body.messages[0].typeUrl,
...decodedMsg,
get decodedMsg() {
return api.decodeMsg<any>(decodedTx.body.messages[0]);
},
},
};
});

it('filter message data for true', () => {
const result = filterMessageData(msg, TEST_MESSAGE_FILTER_TRUE);
expect(result).toEqual(true);
});

it('filter message data for false', () => {
const decodedMsg = api.decodeMsg<any>(decodedTx.body.messages[0]);
const msg: CosmosMessage = {
idx: 0,
block: {} as CosmosBlock,
tx: {} as CosmosTransaction,
msg: {
typeUrl: decodedTx.body.messages[0].typeUrl,
...decodedMsg,
},
};
const result = filterMessageData(msg, TEST_MESSAGE_FILTER_FALSE);
expect(result).toEqual(false);
});

it('filter nested message data for true', () => {
const decodedMsg = api.decodeMsg<any>(decodedTx.body.messages[0]);
const msg: CosmosMessage = {
idx: 0,
block: {} as CosmosBlock,
tx: {} as CosmosTransaction,
msg: {
typeUrl: decodedTx.body.messages[0].typeUrl,
...decodedMsg,
},
};
const result = filterMessageData(msg, TEST_NESTED_MESSAGE_FILTER_TRUE);
expect(result).toEqual(true);
});

it('filter nested message data for false', () => {
const decodedMsg = api.decodeMsg<any>(decodedTx.body.messages[0]);
const msg: CosmosMessage = {
idx: 0,
block: {} as CosmosBlock,
tx: {} as CosmosTransaction,
msg: {
typeUrl: decodedTx.body.messages[0].typeUrl,
...decodedMsg,
},
};
const result = filterMessageData(msg, TEST_NESTED_MESSAGE_FILTER_FALSE);
expect(result).toEqual(false);
});

it('filter nested message data for invalid path', () => {
const result = filterMessageData(
msg,
TEST_NESTED_MESSAGE_FILTER_INVALID_PATH,
);
expect(result).toEqual(false);
});

it('does not wrap events of failed transaction', async () => {
const blockInfo = await api.blockResults(TEST_FAILTX_BLOCKNUMBER);
const failedTx = blockInfo.results[2];
Expand All @@ -168,4 +160,16 @@ describe('CosmosUtils', () => {
const events = wrapEvent({} as CosmosBlock, [tx], api);
expect(events.length).toEqual(0);
});

it('does not lazy decode failed message filters', () => {
const spy = jest.spyOn(msg.msg, 'decodedMsg', 'get');
const result = filterMessageData(msg, TEST_MESSAGE_FILTER_FALSE_2);
expect(spy).not.toHaveBeenCalled();
});

it('lazy decode passed message filters', () => {
const spy = jest.spyOn(msg.msg, 'decodedMsg', 'get');
const result = filterMessageData(msg, TEST_MESSAGE_FILTER_TRUE);
expect(spy).toHaveBeenCalled();
});
});
Loading

0 comments on commit ecb6c78

Please sign in to comment.