Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snappy frame encode big payloads as chunks as per the standard #3912

Merged
merged 11 commits into from
May 4, 2022
2 changes: 1 addition & 1 deletion packages/lodestar/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"@chainsafe/lodestar-utils": "^0.36.0",
"@chainsafe/lodestar-validator": "^0.36.0",
"@chainsafe/persistent-merkle-tree": "^0.4.1",
"@chainsafe/snappy-stream": "5.0.0",
"@chainsafe/snappy-stream": "5.1.1",
"@chainsafe/ssz": "^0.9.1",
"@ethersproject/abi": "^5.0.0",
"@types/datastore-level": "^3.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export async function* writeSszSnappyPayload<T extends RequestOrOutgoingResponse
* Buffered Snappy writer
*/
function encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
const stream = createCompressStream();
const stream = createCompressStream({asyncCompress: true});
dapplion marked this conversation as resolved.
Show resolved Hide resolved
stream.write(bytes);
stream.end();
return source<Buffer>(stream);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import chai, {expect} from "chai";
import chaiAsPromised from "chai-as-promised";
import varint from "varint";
import {BufferedSource} from "../../../../../../src/network/reqresp/utils";
import {readSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {isEqualSszType} from "../../../../../utils/ssz";
import {arrToSource} from "../../../../../../test/unit/network/reqresp/utils";
import {goerliShadowForkBlock13249} from "./testData";

chai.use(chaiAsPromised);

describe("network / reqresp / sszSnappy / decode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const {id, type, bytes, streamedBody, body} of testCases) {
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const streamedBytes = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);

it(id, async () => {
const bufferedSource = new BufferedSource(arrToSource([streamedBytes]));
const bodyResult = await readSszSnappyPayload(bufferedSource, type);
expect(isEqualSszType(type, bodyResult, deserializedBody)).to.equal(true, "Wrong decoded body");
});
}
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import all from "it-all";
import pipe from "it-pipe";
import {expect} from "chai";
import varint from "varint";

import {allForks, ssz} from "@chainsafe/lodestar-types";

import {reqRespBlockResponseSerializer} from "../../../../../../src/network/reqresp/types";
import {writeSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {goerliShadowForkBlock13249} from "./testData";
import {RequestOrOutgoingResponseBody} from "../../../../../../src/network/reqresp/types";

describe("network / reqresp / sszSnappy / encode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const testCase of testCases) {
const {id, type, bytes, streamedBody, body} = testCase;
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const reqrespBody =
body ??
(type === ssz.bellatrix.SignedBeaconBlock
? {slot: (deserializedBody as allForks.SignedBeaconBlock).message.slot, bytes}
: deserializedBody);

it(id, async () => {
const encodedChunks = await pipe(
writeSszSnappyPayload(reqrespBody as RequestOrOutgoingResponseBody, reqRespBlockResponseSerializer),
all
);
const encodedStream = Buffer.concat(encodedChunks);
const expectedStreamed = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);
expect(encodedStream).to.be.deep.equal(expectedStreamed);
});
}
});
});
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import fs from "node:fs";
import path from "node:path";

import {bellatrix, ssz} from "@chainsafe/lodestar-types";
import {RequestOrIncomingResponseBody, RequestOrResponseType} from "../../../../../../src/network/reqresp/types";

export interface ISszSnappyTestBlockData<T extends RequestOrIncomingResponseBody> {
id: string;
type: RequestOrResponseType;
bytes: Buffer;
streamedBody: Buffer;
body?: T;
}

/**
* A real big bellatrix block from goerli-shadow-fork-2 devnet, which is expected to be
* encoded in multiple chunks.
*/

export const goerliShadowForkBlock13249: ISszSnappyTestBlockData<bellatrix.SignedBeaconBlock> = {
id: "goerli-shadow-fork-block-13249",
type: ssz.bellatrix.SignedBeaconBlock,
bytes: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/serialized.ssz")),
streamedBody: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/streamed.snappy")),
};
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ describe("snappy frames uncompress", function () {
const decompress = new SnappyFramesUncompress();

const testData = Buffer.alloc(100000, 4).toString();
let result = Buffer.alloc(0);

compressStream.on("data", function (data) {
const result = decompress.uncompress(data);
if (result) {
// testData will come compressed as two or more chunks
result = Buffer.concat([result, decompress.uncompress(data) ?? Buffer.alloc(0)]);
if (result.length === testData.length) {
expect(result.toString()).to.be.equal(testData);
done();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/types/snappy-stream/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ declare module "@chainsafe/snappy-stream" {
import {Transform} from "node:stream";

export function createUncompressStream(opts?: {asBuffer?: boolean}): Transform;
export function createCompressStream(): Transform;
export function createCompressStream(opts?:{asyncCompress?: boolean}): Transform;
}
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,10 @@
resolved "https://registry.yarnpkg.com/@chainsafe/persistent-ts/-/persistent-ts-0.19.1.tgz#53d03aa31ef7698b09327f74eef01e286b97bae3"
integrity sha512-fUFFFFxdcpYkMAHnjm83EYL/R/smtVmEkJr3FGSI6dwPk4ue9rXjEHf7FTd3V8AbVOcTJGriN4cYf2V+HOYkjQ==

"@chainsafe/snappy-stream@5.0.0":
version "5.0.0"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-stream/-/snappy-stream-5.0.0.tgz#87dfb8dd6e5a20c7e982700974fd59941f5a96c4"
integrity sha512-E5Y9KsyTMjXGgoLl2sIetiIpztum4NUznNDAYa+DoN20HjGNUv4ZSB5rnQrlWKVq6POnkL6vPTZC2TLYosR0wA==
"@chainsafe/snappy-stream@5.1.1":
version "5.1.1"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-stream/-/snappy-stream-5.1.1.tgz#9f3c79fd936b591d4a79d1801ffb582df54f1858"
integrity sha512-wy1c0RLUttVYMQHN/zs473LIzZ6NEL2xG3T8vJp+Ag99H/lQldnwYY6aKGcMrt6hEhndRx0a4NaUxr70MTzsLg==
dependencies:
"@chainsafe/fast-crc32c" "3.0.0"
bl "^1.0.0"
Expand Down Expand Up @@ -7063,7 +7063,7 @@ libnpmaccess@^4.0.1:
npm-package-arg "^8.1.2"
npm-registry-fetch "^10.0.0"

libnpmpublish@4.0.0, libnpmpublish@^4.0.0:
libnpmpublish@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/libnpmpublish/-/libnpmpublish-4.0.0.tgz#ad6413914e0dfd78df868ce14ba3d3a4cc8b385b"
integrity sha512-2RwYXRfZAB1x/9udKpZmqEzSqNd7ouBRU52jyG14/xG8EF+O9A62d7/XVR3iABEQHf1iYhkm0Oq9iXjrL3tsXA==
Expand Down