Skip to content

Commit

Permalink
Snappy frame encode big payloads as chunks as per standard
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Apr 12, 2022
1 parent 6df28de commit ea61f5c
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import {createCompressStream} from "@chainsafe/snappy-stream";
import {RequestOrOutgoingResponseBody, OutgoingSerializer} from "../../types";
import {SszSnappyError, SszSnappyErrorCode} from "./errors";

/**
* As per the snappy framing format for streams, the size of any uncompressed chunk can be
* no longer than 65536 bytes.
*
* From: https://github.com/google/snappy/blob/main/framing_format.txt#L90:L92
*/
const UNCOMPRESSED_CHUNK_SIZE = 65536;
/**
* ssz_snappy encoding strategy writer.
* Yields byte chunks for encoded header and payload as defined in the spec:
Expand All @@ -30,7 +37,11 @@ export async function* writeSszSnappyPayload<T extends RequestOrOutgoingResponse
*/
function encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
const stream = createCompressStream();
stream.write(bytes);
for (let startFrom = 0; startFrom < bytes.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) {
const endAt = startFrom + Math.min(bytes.length - startFrom, UNCOMPRESSED_CHUNK_SIZE);
const bytesChunk = bytes.slice(startFrom, endAt);
stream.write(bytesChunk);
}
stream.end();
return source<Buffer>(stream);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import chai, {expect} from "chai";
import chaiAsPromised from "chai-as-promised";
import varint from "varint";
import {BufferedSource} from "../../../../../../src/network/reqresp/utils";
import {readSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {isEqualSszType} from "../../../../../utils/ssz";
import {arrToSource} from "../../../../../../test/unit/network/reqresp/utils";
import {goerliShadowForkBlock13249} from "./testData";

chai.use(chaiAsPromised);

describe("network / reqresp / sszSnappy / decode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const {id, type, bytes, streamedBody, body} of testCases) {
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const streamedBytes = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);

it(id, async () => {
const bufferedSource = new BufferedSource(arrToSource([streamedBytes]));
const bodyResult = await readSszSnappyPayload(bufferedSource, type);
expect(isEqualSszType(type, bodyResult, deserializedBody)).to.equal(true, "Wrong decoded body");
});
}
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import all from "it-all";
import pipe from "it-pipe";
import {expect} from "chai";
import varint from "varint";

import {allForks, ssz} from "@chainsafe/lodestar-types";

import {reqRespBlockResponseSerializer} from "../../../../../../src/network/reqresp/types";
import {writeSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {goerliShadowForkBlock13249} from "./testData";
import {RequestOrOutgoingResponseBody} from "../../../../../../src/network/reqresp/types";

describe("network / reqresp / sszSnappy / encode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const testCase of testCases) {
const {id, type, bytes, streamedBody, body} = testCase;
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const reqrespBody =
body ??
(type === ssz.bellatrix.SignedBeaconBlock
? {slot: (deserializedBody as allForks.SignedBeaconBlock).message.slot, bytes}
: deserializedBody);

it(id, async () => {
const encodedChunks = await pipe(
writeSszSnappyPayload(reqrespBody as RequestOrOutgoingResponseBody, reqRespBlockResponseSerializer),
all
);
const encodedStream = Buffer.concat(encodedChunks);
const expectedStreamed = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);
expect(encodedStream).to.be.deep.equal(expectedStreamed);
});
}
});
});
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import fs from "node:fs";
import path from "node:path";

import {bellatrix, ssz} from "@chainsafe/lodestar-types";
import {RequestOrIncomingResponseBody, RequestOrResponseType} from "../../../../../../src/network/reqresp/types";

// This test data generated with code from 'master' at Jan 1st 2021
// commit: ea3ffab1ffb8093b61a8ebfa4b4432c604c10819

export interface ISszSnappyTestBlockData<T extends RequestOrIncomingResponseBody> {
id: string;
type: RequestOrResponseType;
bytes: Buffer;
streamedBody: Buffer;
body?: T;
}

/**
* A real big bellatrix block from goerli-shadow-fork-2 devnet, which is expected to be
* encoded in multiple chunks (apart from the legngth prefix and the snappy frames header)
*/

export const goerliShadowForkBlock13249: ISszSnappyTestBlockData<bellatrix.SignedBeaconBlock> = {
id: "goerli-shadow-fork-block-13249",
type: ssz.bellatrix.SignedBeaconBlock,
bytes: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/serialized.ssz")),
streamedBody: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/streamed.snappy")),
};

0 comments on commit ea61f5c

Please sign in to comment.