diff --git a/packages/lodestar/package.json b/packages/lodestar/package.json index 0f0559a4bf18..ccb74e2efa74 100644 --- a/packages/lodestar/package.json +++ b/packages/lodestar/package.json @@ -76,7 +76,7 @@ "@chainsafe/lodestar-utils": "^0.36.0", "@chainsafe/lodestar-validator": "^0.36.0", "@chainsafe/persistent-merkle-tree": "^0.4.1", - "@chainsafe/snappy-stream": "5.0.0", + "@chainsafe/snappy-stream": "5.1.1", "@chainsafe/ssz": "^0.9.1", "@ethersproject/abi": "^5.0.0", "@types/datastore-level": "^3.0.0", diff --git a/packages/lodestar/src/network/reqresp/encodingStrategies/sszSnappy/encode.ts b/packages/lodestar/src/network/reqresp/encodingStrategies/sszSnappy/encode.ts index 1bd27bf40325..b59c85f1f29d 100644 --- a/packages/lodestar/src/network/reqresp/encodingStrategies/sszSnappy/encode.ts +++ b/packages/lodestar/src/network/reqresp/encodingStrategies/sszSnappy/encode.ts @@ -29,6 +29,14 @@ export async function* writeSszSnappyPayload { + /** + * Use sync version (default) for compress as it is almost 2x faster than async + * one and most payloads are "1 chunk" and 100kb payloads (which would mostly be + * big bellatrix blocks with transactions) are just 2 chunks + * + * To use async version (for e.g. on big payloads) instantiate the stream with + * `createCompressStream({asyncCompress: true})` + */ const stream = createCompressStream(); stream.write(bytes); stream.end(); diff --git a/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/decode.test.ts b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/decode.test.ts new file mode 100644 index 000000000000..d7299d6c297d --- /dev/null +++ b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/decode.test.ts @@ -0,0 +1,27 @@ +import chai, {expect} from "chai"; +import chaiAsPromised from "chai-as-promised"; +import varint from "varint"; +import {BufferedSource} from "../../../../../../src/network/reqresp/utils"; +import {readSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy"; +import {isEqualSszType} from "../../../../../utils/ssz"; +import {arrToSource} from "../../../../../../test/unit/network/reqresp/utils"; +import {goerliShadowForkBlock13249} from "./testData"; + +chai.use(chaiAsPromised); + +describe("network / reqresp / sszSnappy / decode", () => { + describe("Test data vectors (generated in a previous version)", () => { + const testCases = [goerliShadowForkBlock13249]; + + for (const {id, type, bytes, streamedBody, body} of testCases) { + const deserializedBody = body ?? type.deserialize(Buffer.from(bytes)); + const streamedBytes = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]); + + it(id, async () => { + const bufferedSource = new BufferedSource(arrToSource([streamedBytes])); + const bodyResult = await readSszSnappyPayload(bufferedSource, type); + expect(isEqualSszType(type, bodyResult, deserializedBody)).to.equal(true, "Wrong decoded body"); + }); + } + }); +}); diff --git a/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/encode.test.ts b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/encode.test.ts new file mode 100644 index 000000000000..2a206e6cb08c --- /dev/null +++ b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/encode.test.ts @@ -0,0 +1,37 @@ +import all from "it-all"; +import pipe from "it-pipe"; +import {expect} from "chai"; +import varint from "varint"; + +import {allForks, ssz} from "@chainsafe/lodestar-types"; + +import {reqRespBlockResponseSerializer} from "../../../../../../src/network/reqresp/types"; +import {writeSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy"; +import {goerliShadowForkBlock13249} from "./testData"; +import {RequestOrOutgoingResponseBody} from "../../../../../../src/network/reqresp/types"; + +describe("network / reqresp / sszSnappy / encode", () => { + describe("Test data vectors (generated in a previous version)", () => { + const testCases = [goerliShadowForkBlock13249]; + + for (const testCase of testCases) { + const {id, type, bytes, streamedBody, body} = testCase; + const deserializedBody = body ?? type.deserialize(Buffer.from(bytes)); + const reqrespBody = + body ?? + (type === ssz.bellatrix.SignedBeaconBlock + ? {slot: (deserializedBody as allForks.SignedBeaconBlock).message.slot, bytes} + : deserializedBody); + + it(id, async () => { + const encodedChunks = await pipe( + writeSszSnappyPayload(reqrespBody as RequestOrOutgoingResponseBody, reqRespBlockResponseSerializer), + all + ); + const encodedStream = Buffer.concat(encodedChunks); + const expectedStreamed = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]); + expect(encodedStream).to.be.deep.equal(expectedStreamed); + }); + } + }); +}); diff --git a/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/goerliShadowForkBlock.13249/serialized.ssz b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/goerliShadowForkBlock.13249/serialized.ssz new file mode 100644 index 000000000000..1be96160512e Binary files /dev/null and b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/goerliShadowForkBlock.13249/serialized.ssz differ diff --git a/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/goerliShadowForkBlock.13249/streamed.snappy b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/goerliShadowForkBlock.13249/streamed.snappy new file mode 100644 index 000000000000..4589528f6184 Binary files /dev/null and b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/goerliShadowForkBlock.13249/streamed.snappy differ diff --git a/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/testData.ts b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/testData.ts new file mode 100644 index 000000000000..3de2ab4e6b0d --- /dev/null +++ b/packages/lodestar/test/unit-mainnet/network/reqresp/encodingStrategies/sszSnappy/testData.ts @@ -0,0 +1,25 @@ +import fs from "node:fs"; +import path from "node:path"; + +import {bellatrix, ssz} from "@chainsafe/lodestar-types"; +import {RequestOrIncomingResponseBody, RequestOrResponseType} from "../../../../../../src/network/reqresp/types"; + +export interface ISszSnappyTestBlockData { + id: string; + type: RequestOrResponseType; + bytes: Buffer; + streamedBody: Buffer; + body?: T; +} + +/** + * A real big bellatrix block from goerli-shadow-fork-2 devnet, which is expected to be + * encoded in multiple chunks. + */ + +export const goerliShadowForkBlock13249: ISszSnappyTestBlockData = { + id: "goerli-shadow-fork-block-13249", + type: ssz.bellatrix.SignedBeaconBlock, + bytes: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/serialized.ssz")), + streamedBody: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/streamed.snappy")), +}; diff --git a/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/snappy-frames/uncompress.test.ts b/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/snappy-frames/uncompress.test.ts index 113914652846..96e6c0ced33d 100644 --- a/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/snappy-frames/uncompress.test.ts +++ b/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/snappy-frames/uncompress.test.ts @@ -27,10 +27,12 @@ describe("snappy frames uncompress", function () { const decompress = new SnappyFramesUncompress(); const testData = Buffer.alloc(100000, 4).toString(); + let result = Buffer.alloc(0); compressStream.on("data", function (data) { - const result = decompress.uncompress(data); - if (result) { + // testData will come compressed as two or more chunks + result = Buffer.concat([result, decompress.uncompress(data) ?? Buffer.alloc(0)]); + if (result.length === testData.length) { expect(result.toString()).to.be.equal(testData); done(); } diff --git a/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/testData.ts b/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/testData.ts index 96974ddb1501..9d77c94c8386 100644 --- a/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/testData.ts +++ b/packages/lodestar/test/unit/network/reqresp/encodingStrategies/sszSnappy/testData.ts @@ -9,6 +9,9 @@ export interface ISszSnappyTestData { id: string; type: RequestOrResponseType; body: T; + /** chunks expected in an async compress version of snappy stream */ + asyncChunks: Buffer[]; + /** chunks expected in a sync compress version of snappy stream */ chunks: Buffer[]; } @@ -16,11 +19,12 @@ export const sszSnappyPing: ISszSnappyTestData = { id: "Ping type", type: ssz.phase0.Ping, body: BigInt(1), - chunks: [ + asyncChunks: [ "0x08", // length prefix "0xff060000734e61507059", // snappy frames header "0x010c00000175de410100000000000000", // snappy frames content ].map(fromHexString) as Buffer[], + chunks: ["0x08", "0xff060000734e61507059010c00000175de410100000000000000"].map(fromHexString) as Buffer[], }; export const sszSnappyStatus: ISszSnappyTestData = { @@ -33,11 +37,14 @@ export const sszSnappyStatus: ISszSnappyTestData = { headRoot: Buffer.alloc(32, 0xda), headSlot: 9, }, - chunks: [ + asyncChunks: [ "0x54", // length prefix "0xff060000734e61507059", // snappy frames header "0x001b0000097802c15400da8a010004090009017e2b001c0900000000000000", ].map(fromHexString) as Buffer[], + chunks: ["0x54", "0xff060000734e61507059001b0000097802c15400da8a010004090009017e2b001c0900000000000000"].map( + fromHexString + ) as Buffer[], }; export const sszSnappySignedBeaconBlockPhase0: ISszSnappyTestData = { @@ -66,11 +73,15 @@ export const sszSnappySignedBeaconBlockPhase0: ISszSnappyTestData = { @@ -87,9 +98,13 @@ export const sszSnappySignedBeaconBlockAltair: ISszSnappyTestData