From ff6a4ea05993e6da9970614fa18c0d6a95d77e93 Mon Sep 17 00:00:00 2001 From: Gus Narea Date: Tue, 15 Dec 2020 12:28:58 +0000 Subject: [PATCH] mostly complete implementation --- .dockerignore | 1 + package-lock.json | 6 +- package.json | 2 +- src/_test_utils.ts | 89 ++++++-- src/functional_tests/cogrpc_server.test.ts | 93 ++++++--- src/functional_tests/internet_e2e.test.ts | 37 ++-- src/functional_tests/poweb_server.test.ts | 3 +- src/functional_tests/utils.ts | 4 +- src/services/cogrpc/server.spec.ts | 4 +- src/services/cogrpc/server.ts | 2 +- src/services/cogrpc/service.spec.ts | 230 ++++++++++++++------- src/services/cogrpc/service.ts | 23 ++- 12 files changed, 347 insertions(+), 147 deletions(-) diff --git a/.dockerignore b/.dockerignore index cbd1ea21d..00a234ee1 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,6 +3,7 @@ !src src/functional_tests src/**/*.spec.ts +src/**/_test_utils.ts !tsconfig.json !package.json diff --git a/package-lock.json b/package-lock.json index 6f929961c..b22776c9b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1906,9 +1906,9 @@ } }, "@relaycorp/relaynet-core": { - "version": "1.40.0", - "resolved": "https://registry.npmjs.org/@relaycorp/relaynet-core/-/relaynet-core-1.40.0.tgz", - "integrity": "sha512-xFB2QZB8JHbzSxxSTmxKHWOII8SkGpncrB+jDTREp1gXls+J/eRQkVaebM650NhgoV4CBR5JuBKC8xYfWTVAfg==", + "version": "1.40.1", + "resolved": "https://registry.npmjs.org/@relaycorp/relaynet-core/-/relaynet-core-1.40.1.tgz", + "integrity": "sha512-EMFYC53AE8bVm5cqF8pbFQEfKXzN93I7Uaf61WX9YjF6s0fgPQfqH3RMmun5l5fxeRKHzdp8tUoYo2+bVc2L5Q==", "requires": { "@peculiar/webcrypto": "^1.1.4", "asn1js": "^2.0.26", diff --git a/package.json b/package.json index 51a03fc71..421a2bb3c 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,7 @@ "dependencies": { "@relaycorp/cogrpc": "^1.1.7", "@relaycorp/keystore-vault": "^1.2.0", - "@relaycorp/relaynet-core": "^1.40.0", + "@relaycorp/relaynet-core": "^1.40.1", "@relaycorp/relaynet-pohttp": "^1.5.4", "@typegoose/typegoose": "^7.4.5", "@types/pino": "^6.3.4", diff --git a/src/_test_utils.ts b/src/_test_utils.ts index 11a4df26c..a74837a78 100644 --- a/src/_test_utils.ts +++ b/src/_test_utils.ts @@ -1,9 +1,21 @@ -import { Certificate } from '@relaycorp/relaynet-core'; +import { + CargoCollectionAuthorization, + CargoCollectionRequest, + Certificate, + issueDeliveryAuthorization, + issueGatewayCertificate, + SessionlessEnvelopedData, +} from '@relaycorp/relaynet-core'; import bufferToArray from 'buffer-to-arraybuffer'; import { BinaryLike, createHash, Hash } from 'crypto'; import pino, { symbols as PinoSymbols } from 'pino'; import split2 from 'split2'; +import { reSerializeCertificate } from './services/_test_utils'; + +export const TOMORROW = new Date(); +TOMORROW.setDate(TOMORROW.getDate() + 1); + export const UUID4_REGEX = expect.stringMatching(/^[0-9a-f-]+$/); export const MONGO_ENV_VARS = { @@ -32,6 +44,15 @@ export function arrayBufferFrom(value: string): ArrayBuffer { return bufferToArray(Buffer.from(value)); } +export async function getPromiseRejection(promise: Promise): Promise { + try { + await promise; + } catch (error) { + return error; + } + throw new Error('Expected project to reject'); +} + // tslint:disable-next-line:readonly-array export function mockSpy( spy: jest.MockInstance, @@ -95,17 +116,6 @@ export function partialPinoLog(level: pino.Level, message: string, extraAttribut }); } -export interface PdaChain { - readonly publicGatewayCert: Certificate; - readonly publicGatewayPrivateKey: CryptoKey; - readonly privateGatewayCert: Certificate; - readonly privateGatewayPrivateKey: CryptoKey; - readonly peerEndpointCert: Certificate; - readonly peerEndpointPrivateKey: CryptoKey; - readonly pdaCert: Certificate; - readonly pdaGranteePrivateKey: CryptoKey; -} - function makeSHA256Hash(plaintext: BinaryLike): Hash { return createHash('sha256').update(plaintext); } @@ -135,3 +145,58 @@ export function iterableTake(max: number): (iterable: AsyncIterable) => As } }; } + +export interface CDAChain { + readonly publicGatewayCert: Certificate; + readonly privateGatewayCert: Certificate; +} + +export interface ExternalPdaChain extends CDAChain { + readonly privateGatewayPrivateKey: CryptoKey; + readonly peerEndpointCert: Certificate; + readonly peerEndpointPrivateKey: CryptoKey; + readonly pdaCert: Certificate; + readonly pdaGranteePrivateKey: CryptoKey; +} + +export interface PdaChain extends ExternalPdaChain { + readonly publicGatewayPrivateKey: CryptoKey; +} + +export async function generateCDAChain(pdaChain: ExternalPdaChain): Promise { + const privateGatewayCert = reSerializeCertificate( + await issueGatewayCertificate({ + issuerPrivateKey: pdaChain.privateGatewayPrivateKey, + subjectPublicKey: await pdaChain.privateGatewayCert.getPublicKey(), + validityEndDate: pdaChain.privateGatewayCert.expiryDate, + }), + ); + const publicGatewayCert = reSerializeCertificate( + await issueDeliveryAuthorization({ + issuerCertificate: privateGatewayCert, + issuerPrivateKey: pdaChain.privateGatewayPrivateKey, + subjectPublicKey: await pdaChain.publicGatewayCert.getPublicKey(), + validityEndDate: TOMORROW, + }), + ); + return { privateGatewayCert, publicGatewayCert }; +} + +export async function generateCCA( + recipientAddress: string, + chain: CDAChain, + publicGatewaySelfIssuedCertificate: Certificate, + privateGatewayPrivateKey: CryptoKey, +): Promise { + const ccr = new CargoCollectionRequest(chain.publicGatewayCert); + const ccaPayload = await SessionlessEnvelopedData.encrypt( + ccr.serialize(), + publicGatewaySelfIssuedCertificate, + ); + const cca = new CargoCollectionAuthorization( + recipientAddress, + chain.privateGatewayCert, + Buffer.from(await ccaPayload.serialize()), + ); + return Buffer.from(await cca.serialize(privateGatewayPrivateKey)); +} diff --git a/src/functional_tests/cogrpc_server.test.ts b/src/functional_tests/cogrpc_server.test.ts index b48668ea4..638278d7a 100644 --- a/src/functional_tests/cogrpc_server.test.ts +++ b/src/functional_tests/cogrpc_server.test.ts @@ -1,4 +1,3 @@ -// tslint:disable:no-let import { CogRPCClient, CogRPCError } from '@relaycorp/cogrpc'; import { Cargo, @@ -6,12 +5,21 @@ import { generateRSAKeyPair, issueGatewayCertificate, Parcel, + RecipientAddressType, } from '@relaycorp/relaynet-core'; import { deliverParcel } from '@relaycorp/relaynet-pohttp'; import bufferToArray from 'buffer-to-arraybuffer'; +import grpc from 'grpc'; import { Message, Stan, Subscription } from 'node-nats-streaming'; -import { asyncIterableToArray } from '../_test_utils'; +import { + asyncIterableToArray, + ExternalPdaChain, + generateCCA, + generateCDAChain, + getPromiseRejection, +} from '../_test_utils'; +import { expectBuffersToEqual } from '../services/_test_utils'; import { GW_GOGRPC_URL, GW_POHTTP_URL } from './services'; import { arrayToIterable, connectToNatsStreaming, generatePdaChain, sleep } from './utils'; @@ -68,29 +76,21 @@ describe('Cargo delivery', () => { describe('Cargo collection', () => { test('Authorized CCA should be accepted', async () => { const pdaChain = await generatePdaChain(); - const parcel = new Parcel( - await pdaChain.peerEndpointCert.calculateSubjectPrivateAddress(), - pdaChain.pdaCert, - Buffer.from([]), - { senderCaCertificateChain: [pdaChain.peerEndpointCert, pdaChain.privateGatewayCert] }, - ); - const parcelSerialized = await parcel.serialize(pdaChain.pdaGranteePrivateKey); + const parcelSerialized = await generateDummyParcel(pdaChain); await deliverParcel(GW_POHTTP_URL, parcelSerialized); + await sleep(1); - const cca = new CargoCollectionAuthorization( + const ccaSerialized = await generateCCA( GW_GOGRPC_URL, - pdaChain.privateGatewayCert, - Buffer.from([]), + await generateCDAChain(pdaChain), + pdaChain.publicGatewayCert, + pdaChain.privateGatewayPrivateKey, ); const cogrpcClient = await CogRPCClient.init(GW_GOGRPC_URL); let collectedCargoes: readonly Buffer[]; try { - collectedCargoes = await asyncIterableToArray( - cogrpcClient.collectCargo( - Buffer.from(await cca.serialize(pdaChain.privateGatewayPrivateKey)), - ), - ); + collectedCargoes = await asyncIterableToArray(cogrpcClient.collectCargo(ccaSerialized)); } finally { cogrpcClient.close(); } @@ -104,10 +104,36 @@ describe('Cargo collection', () => { const { payload: cargoMessageSet } = await cargo.unwrapPayload( pdaChain.privateGatewayPrivateKey, ); - await expect(Array.from(cargoMessageSet.messages)).toEqual([parcelSerialized]); + expect(cargoMessageSet.messages).toHaveLength(1); + expectBuffersToEqual(cargoMessageSet.messages[0], parcelSerialized); + }); + + test('Cargo should be signed with Cargo Delivery Authorization', async () => { + const pdaChain = await generatePdaChain(); + await deliverParcel(GW_POHTTP_URL, await generateDummyParcel(pdaChain)); + + await sleep(1); + + const cdaChain = await generateCDAChain(pdaChain); + const ccaSerialized = await generateCCA( + GW_GOGRPC_URL, + cdaChain, + pdaChain.publicGatewayCert, + pdaChain.privateGatewayPrivateKey, + ); + const cogrpcClient = await CogRPCClient.init(GW_GOGRPC_URL); + let collectedCargoes: readonly Buffer[]; + try { + collectedCargoes = await asyncIterableToArray(cogrpcClient.collectCargo(ccaSerialized)); + } finally { + cogrpcClient.close(); + } + + const cargo = await Cargo.deserialize(bufferToArray(collectedCargoes[0])); + await cargo.validate(RecipientAddressType.PRIVATE, [cdaChain.privateGatewayCert]); }); - test('Unauthorized CCA should return zero cargoes', async () => { + test('Unauthorized CCA should be refused', async () => { const unauthorizedSenderKeyPair = await generateRSAKeyPair(); const unauthorizedCertificate = await issueGatewayCertificate({ issuerPrivateKey: unauthorizedSenderKeyPair.privateKey, @@ -123,9 +149,10 @@ describe('Cargo collection', () => { const ccaSerialized = Buffer.from(await cca.serialize(unauthorizedSenderKeyPair.privateKey)); const cogrpcClient = await CogRPCClient.init(GW_GOGRPC_URL); try { - await expect( + const error = await getPromiseRejection( asyncIterableToArray(cogrpcClient.collectCargo(ccaSerialized)), - ).resolves.toHaveLength(0); + ); + expect(error.cause()).toHaveProperty('code', grpc.status.UNAUTHENTICATED); } finally { cogrpcClient.close(); } @@ -133,25 +160,37 @@ describe('Cargo collection', () => { test('CCAs should not be reusable', async () => { const pdaChain = await generatePdaChain(); - const cca = new CargoCollectionAuthorization( + const cdaChain = await generateCDAChain(pdaChain); + const ccaSerialized = await generateCCA( GW_GOGRPC_URL, - pdaChain.privateGatewayCert, - Buffer.from([]), + cdaChain, + pdaChain.publicGatewayCert, + pdaChain.privateGatewayPrivateKey, ); - const ccaSerialized = Buffer.from(await cca.serialize(pdaChain.privateGatewayPrivateKey)); const cogrpcClient = await CogRPCClient.init(GW_GOGRPC_URL); try { await expect(asyncIterableToArray(cogrpcClient.collectCargo(ccaSerialized))).toResolve(); - await expect( + const error = await getPromiseRejection( asyncIterableToArray(cogrpcClient.collectCargo(ccaSerialized)), - ).rejects.toBeInstanceOf(CogRPCError); + ); + expect(error.cause()).toHaveProperty('code', grpc.status.PERMISSION_DENIED); } finally { cogrpcClient.close(); } }); }); +async function generateDummyParcel(pdaChain: ExternalPdaChain): Promise { + const parcel = new Parcel( + await pdaChain.peerEndpointCert.calculateSubjectPrivateAddress(), + pdaChain.pdaCert, + Buffer.from([]), + { senderCaCertificateChain: [pdaChain.peerEndpointCert, pdaChain.privateGatewayCert] }, + ); + return parcel.serialize(pdaChain.pdaGranteePrivateKey); +} + async function getLastQueueMessage(): Promise { const stanConnection = await connectToNatsStreaming(); const subscription = subscribeToCRCChannel(stanConnection); diff --git a/src/functional_tests/internet_e2e.test.ts b/src/functional_tests/internet_e2e.test.ts index d769b73ac..203fa5c79 100644 --- a/src/functional_tests/internet_e2e.test.ts +++ b/src/functional_tests/internet_e2e.test.ts @@ -2,7 +2,6 @@ import { CogRPCClient } from '@relaycorp/cogrpc'; import { VaultPrivateKeyStore } from '@relaycorp/keystore-vault'; import { Cargo, - CargoCollectionAuthorization, CargoMessageSet, Certificate, issueDeliveryAuthorization, @@ -19,9 +18,15 @@ import { get as getEnvVar } from 'env-var'; import pipe from 'it-pipe'; import uuid from 'uuid-random'; -import { asyncIterableToArray, iterableTake } from '../_test_utils'; +import { + asyncIterableToArray, + ExternalPdaChain, + generateCCA, + generateCDAChain, + iterableTake, +} from '../_test_utils'; import { GW_GOGRPC_URL, GW_POWEB_LOCAL_PORT, PONG_ENDPOINT_ADDRESS } from './services'; -import { arrayToIterable, ExternalPdaChain, generatePdaChain, IS_GITHUB, sleep } from './utils'; +import { arrayToIterable, generatePdaChain, IS_GITHUB, sleep } from './utils'; test('Sending pings via PoWeb and receiving pongs via PoHTTP', async () => { const powebClient = PoWebClient.initLocal(GW_POWEB_LOCAL_PORT); @@ -64,14 +69,14 @@ test('Sending pings via PoWeb and receiving pongs via PoHTTP', async () => { test('Sending pings via CogRPC and receiving pongs via PoHTTP', async () => { const pongEndpointSessionCertificate = await getPongEndpointKeyPairs(); - const gwPDAChain = await generatePdaChain(); + const pdaChain = await generatePdaChain(); const pingId = Buffer.from(uuid()); const pingParcelData = await makePingParcel( pingId, pongEndpointSessionCertificate.identityCert, pongEndpointSessionCertificate.sessionCert, - gwPDAChain, + pdaChain, ); const cogRPCClient = await CogRPCClient.init(GW_GOGRPC_URL); @@ -79,7 +84,7 @@ test('Sending pings via CogRPC and receiving pongs via PoHTTP', async () => { // Deliver the ping message encapsulated in a cargo const cargoSerialized = await encapsulateParcelsInCargo( [pingParcelData.parcelSerialized], - gwPDAChain, + pdaChain, ); await asyncIterableToArray( cogRPCClient.deliverCargo( @@ -90,11 +95,16 @@ test('Sending pings via CogRPC and receiving pongs via PoHTTP', async () => { await sleep(IS_GITHUB ? 4 : 2); // Collect the pong message encapsulated in a cargo - const collectedCargoes = await asyncIterableToArray( - cogRPCClient.collectCargo(await makeCCA(gwPDAChain)), + const cdaChain = await generateCDAChain(pdaChain); + const ccaSerialized = await generateCCA( + GW_GOGRPC_URL, + cdaChain, + pdaChain.publicGatewayCert, + pdaChain.privateGatewayPrivateKey, ); + const collectedCargoes = await asyncIterableToArray(cogRPCClient.collectCargo(ccaSerialized)); expect(collectedCargoes).toHaveLength(1); - const collectedMessages = await extractMessagesFromCargo(collectedCargoes[0], gwPDAChain); + const collectedMessages = await extractMessagesFromCargo(collectedCargoes[0], pdaChain); expect(collectedMessages).toHaveLength(2); expect(ParcelCollectionAck.deserialize(collectedMessages[0])).toHaveProperty( 'parcelId', @@ -200,15 +210,6 @@ async function encapsulateParcelsInCargo( return Buffer.from(await cargo.serialize(gwPDAChain.privateGatewayPrivateKey)); } -async function makeCCA(gwPDAChain: ExternalPdaChain): Promise { - const cca = new CargoCollectionAuthorization( - GW_GOGRPC_URL, - gwPDAChain.privateGatewayCert, - Buffer.from([]), - ); - return Buffer.from(await cca.serialize(gwPDAChain.privateGatewayPrivateKey)); -} - async function extractMessagesFromCargo( cargoSerialized: Buffer, gwPDAChain: ExternalPdaChain, diff --git a/src/functional_tests/poweb_server.test.ts b/src/functional_tests/poweb_server.test.ts index 1a462d177..b10af4a0c 100644 --- a/src/functional_tests/poweb_server.test.ts +++ b/src/functional_tests/poweb_server.test.ts @@ -16,11 +16,10 @@ import { } from '@relaycorp/relaynet-poweb'; import pipe from 'it-pipe'; -import { asyncIterableToArray, iterableTake } from '../_test_utils'; +import { asyncIterableToArray, ExternalPdaChain, iterableTake } from '../_test_utils'; import { expectBuffersToEqual } from '../services/_test_utils'; import { GW_POWEB_LOCAL_PORT } from './services'; import { - ExternalPdaChain, generatePdaChain, getPublicGatewayCertificate, registerPrivateGateway, diff --git a/src/functional_tests/utils.ts b/src/functional_tests/utils.ts index 1eead4034..3fe157602 100644 --- a/src/functional_tests/utils.ts +++ b/src/functional_tests/utils.ts @@ -13,12 +13,10 @@ import { get as getEnvVar } from 'env-var'; import { connect as stanConnect, Stan } from 'node-nats-streaming'; import uuid from 'uuid-random'; -import { PdaChain } from '../_test_utils'; +import { ExternalPdaChain } from '../_test_utils'; import { initVaultKeyStore } from '../backingServices/privateKeyStore'; import { GW_POWEB_LOCAL_PORT } from './services'; -export type ExternalPdaChain = Omit; - export const IS_GITHUB = getEnvVar('IS_GITHUB').asBool(); export const OBJECT_STORAGE_CLIENT = initS3Client(); diff --git a/src/services/cogrpc/server.spec.ts b/src/services/cogrpc/server.spec.ts index 0db76c4b4..ae482b437 100644 --- a/src/services/cogrpc/server.spec.ts +++ b/src/services/cogrpc/server.spec.ts @@ -68,11 +68,11 @@ describe('runServer', () => { ); }); - test('Server should accept metadata of up to 4 kb', async () => { + test('Server should accept metadata of up to 6 kb', async () => { await runServer(); expect(grpc.Server).toBeCalledWith( - expect.objectContaining({ 'grpc.max_metadata_size': 4_000 }), + expect.objectContaining({ 'grpc.max_metadata_size': 6_000 }), ); }); diff --git a/src/services/cogrpc/server.ts b/src/services/cogrpc/server.ts index ef7af4c40..cadc696e7 100644 --- a/src/services/cogrpc/server.ts +++ b/src/services/cogrpc/server.ts @@ -12,7 +12,7 @@ const NETLOC = '0.0.0.0:8080'; const MAX_RECEIVED_MESSAGE_LENGTH = MAX_RAMF_MESSAGE_SIZE + 256; // Include protobuf overhead const MAX_CONCURRENT_CALLS = 3; -const MAX_METADATA_SIZE = 4_000; // ~2.8 kib for a base64-encoded CCA + overhead +const MAX_METADATA_SIZE = 6_000; // ~4.9 kib for a base64-encoded CCA + overhead const MAX_CONNECTION_AGE_MINUTES = 15; const MAX_CONNECTION_AGE_GRACE_SECONDS = 30; const MAX_CONNECTION_IDLE_SECONDS = 5; diff --git a/src/services/cogrpc/service.spec.ts b/src/services/cogrpc/service.spec.ts index aad766d33..24e495e58 100644 --- a/src/services/cogrpc/service.spec.ts +++ b/src/services/cogrpc/service.spec.ts @@ -1,5 +1,3 @@ -/* tslint:disable:no-let */ - import { CargoDelivery, CargoDeliveryAck, CargoRelayServerMethodSet } from '@relaycorp/cogrpc'; import { VaultPrivateKeyStore } from '@relaycorp/keystore-vault'; import { @@ -13,6 +11,8 @@ import { Parcel, ParcelCollectionAck, RAMFSyntaxError, + RecipientAddressType, + SessionlessEnvelopedData, } from '@relaycorp/relaynet-core'; import * as typegoose from '@typegoose/typegoose'; import bufferToArray from 'buffer-to-arraybuffer'; @@ -21,7 +21,11 @@ import * as grpc from 'grpc'; import mongoose from 'mongoose'; import { + arrayBufferFrom, arrayToAsyncIterable, + CDAChain, + generateCCA, + generateCDAChain, makeMockLogging, mockSpy, partialPinoLog, @@ -51,11 +55,13 @@ const NATS_CLUSTER_ID = 'nats-cluster-id'; const TOMORROW = new Date(); TOMORROW.setDate(TOMORROW.getDate() + 1); -let PDA_CHAIN: PdaChain; -let PEER_GATEWAY_ADDRESS: string; +let pdaChain: PdaChain; +let cdaChain: CDAChain; +let peerGatewayAddress: string; beforeAll(async () => { - PDA_CHAIN = await generatePdaChain(); - PEER_GATEWAY_ADDRESS = await PDA_CHAIN.privateGatewayCert.calculateSubjectPrivateAddress(); + pdaChain = await generatePdaChain(); + cdaChain = await generateCDAChain(pdaChain); + peerGatewayAddress = await pdaChain.privateGatewayCert.calculateSubjectPrivateAddress(); }); const MOCK_MONGOOSE_CONNECTION: mongoose.Connection = new EventEmitter() as any; @@ -134,8 +140,8 @@ describe('deliverCargo', () => { let CARGO: Cargo; let CARGO_SERIALIZATION: Buffer; beforeAll(async () => { - CARGO = new Cargo(COGRPC_ADDRESS, PDA_CHAIN.privateGatewayCert, Buffer.from('payload')); - CARGO_SERIALIZATION = Buffer.from(await CARGO.serialize(PDA_CHAIN.privateGatewayPrivateKey)); + CARGO = new Cargo(COGRPC_ADDRESS, pdaChain.privateGatewayCert, Buffer.from('payload')); + CARGO_SERIALIZATION = Buffer.from(await CARGO.serialize(pdaChain.privateGatewayPrivateKey)); }); let NATS_CLIENT: natsStreaming.NatsStreamingClient; @@ -172,7 +178,7 @@ describe('deliverCargo', () => { const RETRIEVE_OWN_CERTIFICATES_SPY = mockSpy( jest.spyOn(certs, 'retrieveOwnCertificates'), - () => [PDA_CHAIN.publicGatewayCert], + () => [pdaChain.publicGatewayCert], ); test('NATS Streaming publisher should be initialized upfront', async () => { @@ -223,10 +229,10 @@ describe('deliverCargo', () => { // The invalid message is followed by a valid one to check that processing continues const invalidCargo = new Cargo( COGRPC_ADDRESS, - PDA_CHAIN.peerEndpointCert, + pdaChain.peerEndpointCert, Buffer.from('payload'), ); - const invalidCargoSerialized = await invalidCargo.serialize(PDA_CHAIN.peerEndpointPrivateKey); + const invalidCargoSerialized = await invalidCargo.serialize(pdaChain.peerEndpointPrivateKey); const invalidDeliveryId = 'invalid'; CALL.output.push( { cargo: Buffer.from(invalidCargoSerialized), id: invalidDeliveryId }, @@ -272,7 +278,7 @@ describe('deliverCargo', () => { cargoId: CARGO.id, grpcClient: CALL.getPeer(), grpcMethod: 'deliverCargo', - peerGatewayAddress: PEER_GATEWAY_ADDRESS, + peerGatewayAddress, }), ); }); @@ -413,8 +419,8 @@ describe('collectCargo', () => { const MOCK_FETCH_NODE_KEY = mockSpy( jest.spyOn(VaultPrivateKeyStore.prototype, 'fetchNodeKey'), async () => ({ - certificate: PDA_CHAIN.publicGatewayCert, - privateKey: PDA_CHAIN.publicGatewayPrivateKey, + certificate: pdaChain.publicGatewayCert, + privateKey: pdaChain.publicGatewayPrivateKey, }), ); @@ -449,16 +455,16 @@ describe('collectCargo', () => { DUMMY_PARCEL_SERIALIZED = Buffer.from(await DUMMY_PARCEL.serialize(keyPair.privateKey)); }); - let CCA: CargoCollectionAuthorization; - let AUTHORIZATION_METADATA: grpc.MetadataValue; + let ccaSerialized: Buffer; + let authorizationMetadata: grpc.MetadataValue; beforeAll(async () => { - CCA = new CargoCollectionAuthorization( + ccaSerialized = await generateCCA( COGRPC_ADDRESS, - PDA_CHAIN.privateGatewayCert, - Buffer.from([]), + cdaChain, + pdaChain.publicGatewayCert, + pdaChain.privateGatewayPrivateKey, ); - const ccaSerialized = Buffer.from(await CCA.serialize(PDA_CHAIN.privateGatewayPrivateKey)); - AUTHORIZATION_METADATA = `Relaynet-CCA ${ccaSerialized.toString('base64')}`; + authorizationMetadata = `Relaynet-CCA ${ccaSerialized.toString('base64')}`; }); describe('CCA validation', () => { @@ -540,8 +546,67 @@ describe('collectCargo', () => { cb(); }); - const ccaSerialized = Buffer.from('I am not really a RAMF message'); - CALL.metadata.add('Authorization', `Relaynet-CCA ${ccaSerialized.toString('base64')}`); + const invalidCCASerialized = Buffer.from('I am not really a RAMF message'); + CALL.metadata.add('Authorization', `Relaynet-CCA ${invalidCCASerialized.toString('base64')}`); + + await SERVICE.collectCargo(CALL.convertToGrpcStream()); + }); + + test('UNAUTHENTICATED should be returned if payload is not an EnvelopedData value', async (cb) => { + CALL.on('error', (error) => { + expect(MOCK_LOGS).toContainEqual(invalidCCRLog('CMSError')); + expect(error).toEqual({ + code: grpc.status.UNAUTHENTICATED, + message: 'Invalid CCA', + }); + + cb(); + }); + + const invalidCCASerialized = await generateCCAForPayload(COGRPC_ADDRESS, new ArrayBuffer(0)); + CALL.metadata.add('Authorization', `Relaynet-CCA ${invalidCCASerialized.toString('base64')}`); + + await SERVICE.collectCargo(CALL.convertToGrpcStream()); + }); + + test('UNAUTHENTICATED should be returned if EnvelopedData cannot be decrypted', async (cb) => { + CALL.on('error', (error) => { + expect(MOCK_LOGS).toContainEqual(invalidCCRLog('CMSError')); + expect(error).toEqual({ + code: grpc.status.UNAUTHENTICATED, + message: 'Invalid CCA', + }); + + cb(); + }); + + const payload = await SessionlessEnvelopedData.encrypt( + new ArrayBuffer(0), + pdaChain.pdaCert, // The public gateway doesn't have this key + ); + const invalidCCASerialized = await generateCCAForPayload(COGRPC_ADDRESS, payload.serialize()); + CALL.metadata.add('Authorization', `Relaynet-CCA ${invalidCCASerialized.toString('base64')}`); + + await SERVICE.collectCargo(CALL.convertToGrpcStream()); + }); + + test('UNAUTHENTICATED should be returned if CCR is malformed', async (cb) => { + CALL.on('error', (error) => { + expect(MOCK_LOGS).toContainEqual(invalidCCRLog(InvalidMessageError.name)); + expect(error).toEqual({ + code: grpc.status.UNAUTHENTICATED, + message: 'Invalid CCA', + }); + + cb(); + }); + + const payload = await SessionlessEnvelopedData.encrypt( + arrayBufferFrom('not a valid CCR'), + pdaChain.publicGatewayCert, + ); + const invalidCCASerialized = await generateCCAForPayload(COGRPC_ADDRESS, payload.serialize()); + CALL.metadata.add('Authorization', `Relaynet-CCA ${invalidCCASerialized.toString('base64')}`); await SERVICE.collectCargo(CALL.convertToGrpcStream()); }); @@ -549,7 +614,7 @@ describe('collectCargo', () => { test('INVALID_ARGUMENT should be returned if CCA is not bound for current gateway', async (cb) => { const cca = new CargoCollectionAuthorization( `${COGRPC_ADDRESS}/path`, - PDA_CHAIN.privateGatewayCert, + pdaChain.privateGatewayCert, Buffer.from([]), ); CALL.on('error', (error) => { @@ -558,7 +623,7 @@ describe('collectCargo', () => { ccaRecipientAddress: cca.recipientAddress, grpcClient: CALL.getPeer(), grpcMethod: 'collectCargo', - peerGatewayAddress: PEER_GATEWAY_ADDRESS, + peerGatewayAddress, }), ); expect(error).toEqual({ @@ -569,8 +634,10 @@ describe('collectCargo', () => { cb(); }); - const ccaSerialized = Buffer.from(await cca.serialize(PDA_CHAIN.privateGatewayPrivateKey)); - CALL.metadata.add('Authorization', `Relaynet-CCA ${ccaSerialized.toString('base64')}`); + const invalidCCASerialized = Buffer.from( + await cca.serialize(pdaChain.privateGatewayPrivateKey), + ); + CALL.metadata.add('Authorization', `Relaynet-CCA ${invalidCCASerialized.toString('base64')}`); await SERVICE.collectCargo(CALL.convertToGrpcStream()); }); @@ -581,7 +648,7 @@ describe('collectCargo', () => { partialPinoLog('info', 'Refusing CCA that was already fulfilled', { grpcClient: CALL.getPeer(), grpcMethod: 'collectCargo', - peerGatewayAddress: PEER_GATEWAY_ADDRESS, + peerGatewayAddress, }), ); expect(error).toEqual({ @@ -593,11 +660,23 @@ describe('collectCargo', () => { }); MOCK_WAS_CCA_FULFILLED.mockResolvedValue(true); - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); }); + async function generateCCAForPayload( + recipientAddress: string, + payload: ArrayBuffer, + ): Promise { + const cca = new CargoCollectionAuthorization( + recipientAddress, + pdaChain.privateGatewayCert, + Buffer.from(payload), + ); + return Buffer.from(await cca.serialize(pdaChain.privateGatewayPrivateKey)); + } + function invalidCCALog(errorMessage: string): ReturnType { return partialPinoLog('info', 'Refusing malformed/invalid CCA', { grpcClient: CALL.getPeer(), @@ -605,10 +684,18 @@ describe('collectCargo', () => { reason: errorMessage, }); } + + function invalidCCRLog(errorTypeName: string): ReturnType { + return partialPinoLog('info', 'Failed to extract Cargo Collection Request', { + err: expect.objectContaining({ type: errorTypeName }), + grpcClient: CALL.getPeer(), + grpcMethod: 'collectCargo', + }); + } }); test('Parcel store should be bound to correct bucket', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); @@ -619,18 +706,18 @@ describe('collectCargo', () => { }); test('Parcels retrieved should be limited to sender of CCA', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); expect(MOCK_RETRIEVE_ACTIVE_PARCELS).toBeCalledWith( - PEER_GATEWAY_ADDRESS, - partialPinoLogger({ peerGatewayAddress: PEER_GATEWAY_ADDRESS }) as any, + peerGatewayAddress, + partialPinoLogger({ peerGatewayAddress }) as any, ); }); test('Call should end immediately if there is no cargo for specified gateway', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); @@ -639,7 +726,7 @@ describe('collectCargo', () => { }); test('One cargo should be returned if all messages fit in it', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); MOCK_RETRIEVE_ACTIVE_PARCELS.mockReturnValue( arrayToAsyncIterable([ @@ -664,7 +751,7 @@ describe('collectCargo', () => { }); test('Call should end after cargo has been delivered', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); MOCK_RETRIEVE_ACTIVE_PARCELS.mockReturnValue( arrayToAsyncIterable([ @@ -683,19 +770,19 @@ describe('collectCargo', () => { }); test('PCAs should be limited to the sender of the CCA', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); expect(MOCK_GENERATE_PCAS).toBeCalledTimes(1); expect(MOCK_GENERATE_PCAS).toBeCalledWith( - await PDA_CHAIN.privateGatewayCert.calculateSubjectPrivateAddress(), + await pdaChain.privateGatewayCert.calculateSubjectPrivateAddress(), MOCK_MONGOOSE_CONNECTION, ); }); test('PCAs should be included in payload', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); MOCK_RETRIEVE_ACTIVE_PARCELS.mockReturnValue( arrayToAsyncIterable([ @@ -719,29 +806,44 @@ describe('collectCargo', () => { await validateCargoDelivery(CALL.input[0], [pcaSerialized, DUMMY_PARCEL_SERIALIZED]); }); - test('Cargoes should be signed with current key', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + test('Cargo should be signed with the current key', async () => { + CALL.metadata.add('Authorization', authorizationMetadata); + MOCK_RETRIEVE_ACTIVE_PARCELS.mockReturnValue( + arrayToAsyncIterable([ + { + body: DUMMY_PARCEL_SERIALIZED, + expiryDate: TOMORROW, + extra: null, + key: 'prefix/1.parcel', + }, + ]), + ); await SERVICE.collectCargo(CALL.convertToGrpcStream()); const gatewayKeyId = Buffer.from(GATEWAY_KEY_ID_BASE64, 'base64'); expect(MOCK_FETCH_NODE_KEY).toBeCalledWith(gatewayKeyId); + + expect(CALL.input).toHaveLength(1); + const cargo = await Cargo.deserialize(bufferToArray(CALL.input[0].cargo)); + await cargo.validate(RecipientAddressType.PRIVATE, [cdaChain.privateGatewayCert]); }); test('CCA should be logged as fulfilled to make sure it is only used once', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); expect(MOCK_RECORD_CCA_FULFILLMENT).toBeCalledTimes(1); + const cca = await CargoCollectionAuthorization.deserialize(bufferToArray(ccaSerialized)); expect(MOCK_RECORD_CCA_FULFILLMENT).toBeCalledWith( - expect.objectContaining({ id: CCA.id }), + expect.objectContaining({ id: cca.id }), MOCK_MONGOOSE_CONNECTION, ); }); test('CCA fulfillment should be logged and end the call', async () => { - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + CALL.metadata.add('Authorization', authorizationMetadata); await SERVICE.collectCargo(CALL.convertToGrpcStream()); @@ -750,45 +852,21 @@ describe('collectCargo', () => { cargoesCollected: 0, grpcClient: CALL.getPeer(), grpcMethod: 'collectCargo', - peerGatewayAddress: PEER_GATEWAY_ADDRESS, + peerGatewayAddress, }), ); expect(CALL.end).toBeCalledWith(); }); - test('Errors while generating cargo should be logged and end the call', async (cb) => { - const err = new Error('Whoops'); - MOCK_FETCH_NODE_KEY.mockRejectedValue(err); - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); - - CALL.on('error', async (callError) => { - expect(MOCK_LOGS).toContainEqual( - partialPinoLog('error', 'Failed to send cargo', { - err: expect.objectContaining({ message: err.message }), - grpcClient: CALL.getPeer(), - grpcMethod: 'collectCargo', - peerGatewayAddress: PEER_GATEWAY_ADDRESS, - }), - ); - - expect(callError).toEqual({ - code: grpc.status.UNAVAILABLE, - message: 'Internal server error; please try again later', - }); - - expect(MOCK_RECORD_CCA_FULFILLMENT).not.toBeCalled(); - - cb(); - }); - - await SERVICE.collectCargo(CALL.convertToGrpcStream()); - }); + test.todo('CCA payload encryption key should be stored if using channel session'); describe('Errors while generating cargo', () => { const err = new Error('Whoops'); beforeEach(() => { - MOCK_FETCH_NODE_KEY.mockRejectedValue(err); - CALL.metadata.add('Authorization', AUTHORIZATION_METADATA); + MOCK_RETRIEVE_ACTIVE_PARCELS.mockImplementation(async function* (): AsyncIterable { + throw err; + }); + CALL.metadata.add('Authorization', authorizationMetadata); }); test('Error should be logged and end the call', async (cb) => { @@ -798,7 +876,7 @@ describe('collectCargo', () => { err: expect.objectContaining({ message: err.message }), grpcClient: CALL.getPeer(), grpcMethod: 'collectCargo', - peerGatewayAddress: PEER_GATEWAY_ADDRESS, + peerGatewayAddress, }), ); cb(); @@ -848,7 +926,7 @@ describe('collectCargo', () => { async function unwrapCargoMessages(cargoSerialized: Buffer): Promise { const cargo = await Cargo.deserialize(bufferToArray(cargoSerialized)); - const { payload } = await cargo.unwrapPayload(PDA_CHAIN.privateGatewayPrivateKey); + const { payload } = await cargo.unwrapPayload(pdaChain.privateGatewayPrivateKey); return payload; } }); diff --git a/src/services/cogrpc/service.ts b/src/services/cogrpc/service.ts index 45e48a9bb..81761717f 100644 --- a/src/services/cogrpc/service.ts +++ b/src/services/cogrpc/service.ts @@ -3,6 +3,7 @@ import { VaultPrivateKeyStore } from '@relaycorp/keystore-vault'; import { Cargo, CargoCollectionAuthorization, + CargoCollectionRequest, CargoMessageStream, Gateway, } from '@relaycorp/relaynet-core'; @@ -186,6 +187,19 @@ async function collectCargo( return; } + let ccr: CargoCollectionRequest; + try { + const payload = await cca.unwrapPayload(vaultKeyStore); + ccr = payload.payload; + } catch (err) { + ccaAwareLogger.info({ err }, 'Failed to extract Cargo Collection Request'); + call.emit('error', { + code: grpc.status.UNAUTHENTICATED, + message: 'Invalid CCA', + }); + return; + } + if (await wasCCAFulfilled(cca, mongooseConnection)) { ccaAwareLogger.info('Refusing CCA that was already fulfilled'); call.emit('error', { @@ -195,13 +209,18 @@ async function collectCargo( return; } - // tslint:disable-next-line:no-let let cargoesCollected = 0; async function* encapsulateMessagesInCargo(messages: CargoMessageStream): AsyncIterable { const publicKeyStore = new MongoPublicKeyStore(mongooseConnection); const gateway = new Gateway(vaultKeyStore, publicKeyStore); - yield* await gateway.generateCargoes(messages, cca.senderCertificate, currentKeyId); + const { privateKey } = await vaultKeyStore.fetchNodeKey(currentKeyId); + yield* await gateway.generateCargoes( + messages, + cca.senderCertificate, + privateKey, + ccr.cargoDeliveryAuthorization, + ); } async function sendCargoes(cargoesSerialized: AsyncIterable): Promise {