From 52b5b722824c84bb604addbb1956071efd9c8044 Mon Sep 17 00:00:00 2001 From: PLAMEN IVANOV Date: Wed, 29 May 2024 02:17:11 -0400 Subject: [PATCH 1/3] Add API key to gRPC server and client --- packages/grpc/package.json | 2 +- packages/grpc/src/__test__/grpc.test.ts | 81 ++++++++++++++++--- packages/grpc/src/index.ts | 10 ++- packages/grpc/src/internal/GrpcInputSource.ts | 32 +++++++- packages/grpc/src/internal/client.ts | 18 ++++- 5 files changed, 118 insertions(+), 25 deletions(-) diff --git a/packages/grpc/package.json b/packages/grpc/package.json index 4725d6c7..4d0ad6f6 100644 --- a/packages/grpc/package.json +++ b/packages/grpc/package.json @@ -1,6 +1,6 @@ { "name": "@walmartlabs/cookie-cutter-grpc", - "version": "1.6.0-beta.2", + "version": "1.6.0-beta.3", "license": "Apache-2.0", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/packages/grpc/src/__test__/grpc.test.ts b/packages/grpc/src/__test__/grpc.test.ts index d1cd3aff..a5160815 100644 --- a/packages/grpc/src/__test__/grpc.test.ts +++ b/packages/grpc/src/__test__/grpc.test.ts @@ -27,6 +27,7 @@ import { } from ".."; import { sample } from "./Sample"; +const apiKey = "token"; let nextPort = 56011; export interface ISampleService { @@ -78,16 +79,19 @@ export const SampleServiceDefinition = { }, }; -function testApp(handler: any, host?: string): CancelablePromise { +function testApp(handler: any, host?: string, apiKey?: string): CancelablePromise { return Application.create() .input() .add( - grpcSource({ - port: nextPort, - host, - definitions: [SampleServiceDefinition], - skipNoStreamingValidation: true, - }) + grpcSource( + { + port: nextPort, + host, + definitions: [SampleServiceDefinition], + skipNoStreamingValidation: true, + }, + apiKey + ) ) .done() .dispatch(handler) @@ -96,13 +100,18 @@ function testApp(handler: any, host?: string): CancelablePromise { async function createClient( host?: string, - config?: Partial + config?: Partial, + apiKey?: string ): Promise { - const client = grpcClient({ - endpoint: `${host || "localhost"}:${nextPort++}`, - definition: SampleServiceDefinition, - ...config, - }); + const client = grpcClient( + { + endpoint: `${host || "localhost"}:${nextPort++}`, + definition: SampleServiceDefinition, + ...config, + }, + undefined, + apiKey + ); return client; } @@ -126,6 +135,29 @@ describe("gRPC source", () => { } }); + it("serves requests with api key validation", async () => { + const app = testApp( + { + onNoStreaming: async ( + request: sample.ISampleRequest, + _: IDispatchContext + ): Promise => { + return { name: request.id.toString() }; + }, + }, + undefined, + apiKey + ); + try { + const client = await createClient(undefined, undefined, apiKey); + const response = await client.NoStreaming({ id: 15 }); + expect(response).toMatchObject({ name: "15" }); + } finally { + app.cancel(); + await app; + } + }); + it("serves response streams", async () => { const app = testApp({ onStreamingOut: async ( @@ -235,6 +267,29 @@ describe("gRPC source", () => { } }); + it("throws error for missing/invalid api key", async () => { + const app = testApp( + { + onNoStreaming: async ( + request: sample.ISampleRequest, + _: IDispatchContext + ): Promise => { + return { name: request.id.toString() }; + }, + }, + undefined, + apiKey + ); + try { + const client = await createClient(); + const response = client.NoStreaming({ id: 15 }); + await expect(response).rejects.toThrowError(/Invalid API Key/); + } finally { + app.cancel(); + await app; + } + }); + it("validates that no streaming operations are exposed", () => { const a = () => grpcSource({ diff --git a/packages/grpc/src/index.ts b/packages/grpc/src/index.ts index eccf046e..301272fe 100644 --- a/packages/grpc/src/index.ts +++ b/packages/grpc/src/index.ts @@ -80,7 +80,8 @@ export interface IResponseStream { } export function grpcSource( - configuration: IGrpcServerConfiguration & IGrpcConfiguration + configuration: IGrpcServerConfiguration & IGrpcConfiguration, + apiKey?: string ): IInputSource & IRequireInitialization { configuration = config.parse( GrpcSourceConfiguration, @@ -90,7 +91,7 @@ export function grpcSource( allocator: Buffer, } ); - return new GrpcInputSource(configuration); + return new GrpcInputSource(configuration, apiKey); } export function grpcMsg(operation: IGrpcServiceMethod, request: any): IMessage { @@ -102,7 +103,8 @@ export function grpcMsg(operation: IGrpcServiceMethod, request: any): IMessage { export function grpcClient( configuration: IGrpcClientConfiguration & IGrpcConfiguration, - certPath?: string + certPath?: string, + apiKey?: string ): T & IRequireInitialization & IDisposable { configuration = config.parse( GrpcClientConfiguration, @@ -122,5 +124,5 @@ export function grpcClient( }, } ); - return createGrpcClient(configuration, certPath); + return createGrpcClient(configuration, certPath, apiKey); } diff --git a/packages/grpc/src/internal/GrpcInputSource.ts b/packages/grpc/src/internal/GrpcInputSource.ts index 01a9c69f..fe7c4d18 100644 --- a/packages/grpc/src/internal/GrpcInputSource.ts +++ b/packages/grpc/src/internal/GrpcInputSource.ts @@ -19,6 +19,7 @@ import { OpenTracingTagKeys, } from "@walmartlabs/cookie-cutter-core"; import { + Metadata, sendUnaryData, Server, ServerCredentials, @@ -58,8 +59,12 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { private logger: ILogger; private tracer: Tracer; private metrics: IMetrics; + private apiKey: string; - constructor(private readonly config: IGrpcServerConfiguration & IGrpcConfiguration) { + constructor( + private readonly config: IGrpcServerConfiguration & IGrpcConfiguration, + apiKey?: string + ) { if (!config.skipNoStreamingValidation) { for (const def of config.definitions) { for (const key of Object.keys(def)) { @@ -79,6 +84,7 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { this.logger = DefaultComponentContext.logger; this.tracer = DefaultComponentContext.tracer; this.metrics = DefaultComponentContext.metrics; + this.apiKey = apiKey; } public async initialize(context: IComponentContext): Promise { @@ -180,7 +186,11 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { if (value !== undefined) { callback(undefined, value); } else if (error !== undefined) { - callback(this.createError(error), null); + if ((error as ServerErrorResponse).code !== undefined) { + callback(error, null); + } else { + callback(this.createError(error), null); + } } else { callback( this.createError("not implemented", status.UNIMPLEMENTED), @@ -198,7 +208,15 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { path: method.path, }); }); - + if (this.apiKey) { + if (!this.isApiKeyValid(call.metadata)) { + await msgRef.release( + undefined, + this.createError("Invalid API Key", status.UNAUTHENTICATED) + ); + return; + } + } if (!(await this.queue.enqueue(msgRef))) { await msgRef.release(undefined, new Error("service unavailable")); } @@ -239,4 +257,12 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { message: error.toString(), }; } + + private isApiKeyValid(meta: Metadata) { + const headerValue = meta.get("custom-auth-header"); + if (!headerValue || headerValue.length < 1 || headerValue[0].toString() !== this.apiKey) { + return false; + } + return true; + } } diff --git a/packages/grpc/src/internal/client.ts b/packages/grpc/src/internal/client.ts index 92d922c1..90d846dd 100644 --- a/packages/grpc/src/internal/client.ts +++ b/packages/grpc/src/internal/client.ts @@ -41,6 +41,7 @@ enum GrpcMetricResult { Success = "success", Error = "error", } +const DEFAULT_API_KEY = "token"; class ClientBase implements IRequireInitialization, IDisposable { private pendingStreams: Set>; @@ -75,7 +76,8 @@ class ClientBase implements IRequireInitialization, IDisposable { export function createGrpcClient( config: IGrpcClientConfiguration & IGrpcConfiguration, - certPath?: string + certPath?: string, + apiKey?: string ): T & IDisposable & IRequireInitialization { const serviceDef = createServiceDefinition(config.definition); let client: Client; @@ -86,7 +88,7 @@ export function createGrpcClient( const metaCallback = (_params: any, callback: (arg0: null, arg1: Metadata) => void) => { const meta = new Metadata(); - meta.add("custom-auth-header", "token"); + meta.add("custom-auth-header", apiKey || DEFAULT_API_KEY); callback(null, meta); }; @@ -165,12 +167,16 @@ export function createGrpcClient( const stream = await retrier.retry((bail) => { try { + const meta = createTracingMetadata(wrapper.tracer, span); + if (!certPath && apiKey) { + meta.set("custom-auth-header", apiKey); + } return client.makeServerStreamRequest( method.path, method.requestSerialize, method.responseDeserialize, request, - createTracingMetadata(wrapper.tracer, span), + meta, callOptions() ); } catch (e) { @@ -239,12 +245,16 @@ export function createGrpcClient( return await retrier.retry(async (bail) => { try { return await new Promise((resolve, reject) => { + const meta = createTracingMetadata(wrapper.tracer, span); + if (!certPath && apiKey) { + meta.set("custom-auth-header", apiKey); + } client.makeUnaryRequest( method.path, method.requestSerialize, method.responseDeserialize, request, - createTracingMetadata(wrapper.tracer, span), + meta, callOptions(), (error, value) => { this.metrics.increment(GrpcMetrics.RequestProcessed, { From ad9d725bb76a56d689ea316c5d9f937054fb51c8 Mon Sep 17 00:00:00 2001 From: PLAMEN IVANOV Date: Wed, 29 May 2024 15:33:55 -0400 Subject: [PATCH 2/3] Address feedback --- packages/grpc/src/__test__/grpc.test.ts | 21 ++++++++----- packages/grpc/src/index.ts | 18 +++++++---- packages/grpc/src/internal/GrpcInputSource.ts | 15 ++++------ packages/grpc/src/internal/client.ts | 30 ++++++++++--------- 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/packages/grpc/src/__test__/grpc.test.ts b/packages/grpc/src/__test__/grpc.test.ts index a5160815..b1012bbd 100644 --- a/packages/grpc/src/__test__/grpc.test.ts +++ b/packages/grpc/src/__test__/grpc.test.ts @@ -22,7 +22,9 @@ import { GrpcMetadata, grpcSource, IGrpcClientConfiguration, + IGrpcClientOptions, IGrpcConfiguration, + IGrpcServerOptions, IResponseStream, } from ".."; import { sample } from "./Sample"; @@ -79,7 +81,11 @@ export const SampleServiceDefinition = { }, }; -function testApp(handler: any, host?: string, apiKey?: string): CancelablePromise { +function testApp( + handler: any, + host?: string, + options?: IGrpcServerOptions +): CancelablePromise { return Application.create() .input() .add( @@ -90,7 +96,7 @@ function testApp(handler: any, host?: string, apiKey?: string): CancelablePromis definitions: [SampleServiceDefinition], skipNoStreamingValidation: true, }, - apiKey + options ) ) .done() @@ -101,7 +107,7 @@ function testApp(handler: any, host?: string, apiKey?: string): CancelablePromis async function createClient( host?: string, config?: Partial, - apiKey?: string + options?: IGrpcClientOptions ): Promise { const client = grpcClient( { @@ -109,8 +115,7 @@ async function createClient( definition: SampleServiceDefinition, ...config, }, - undefined, - apiKey + options ); return client; } @@ -146,10 +151,10 @@ describe("gRPC source", () => { }, }, undefined, - apiKey + { apiKey } ); try { - const client = await createClient(undefined, undefined, apiKey); + const client = await createClient(undefined, undefined, { apiKey }); const response = await client.NoStreaming({ id: 15 }); expect(response).toMatchObject({ name: "15" }); } finally { @@ -278,7 +283,7 @@ describe("gRPC source", () => { }, }, undefined, - apiKey + { apiKey } ); try { const client = await createClient(); diff --git a/packages/grpc/src/index.ts b/packages/grpc/src/index.ts index 301272fe..4aed284b 100644 --- a/packages/grpc/src/index.ts +++ b/packages/grpc/src/index.ts @@ -58,6 +58,10 @@ export interface IGrpcServerConfiguration { readonly skipNoStreamingValidation?: boolean; } +export interface IGrpcServerOptions { + readonly apiKey?: string; +} + export interface IGrpcClientConfiguration { readonly endpoint: string; readonly definition: IGrpcServiceDefinition; @@ -66,6 +70,11 @@ export interface IGrpcClientConfiguration { readonly behavior?: Required; } +export interface IGrpcClientOptions { + readonly certPath?: string; + readonly apiKey?: string; +} + export enum GrpcMetadata { OperationPath = "grpc.OperationPath", ResponseStream = "grpc.ResponseStream", @@ -81,7 +90,7 @@ export interface IResponseStream { export function grpcSource( configuration: IGrpcServerConfiguration & IGrpcConfiguration, - apiKey?: string + options?: IGrpcServerOptions ): IInputSource & IRequireInitialization { configuration = config.parse( GrpcSourceConfiguration, @@ -91,7 +100,7 @@ export function grpcSource( allocator: Buffer, } ); - return new GrpcInputSource(configuration, apiKey); + return new GrpcInputSource(configuration, options); } export function grpcMsg(operation: IGrpcServiceMethod, request: any): IMessage { @@ -103,8 +112,7 @@ export function grpcMsg(operation: IGrpcServiceMethod, request: any): IMessage { export function grpcClient( configuration: IGrpcClientConfiguration & IGrpcConfiguration, - certPath?: string, - apiKey?: string + options?: IGrpcClientOptions ): T & IRequireInitialization & IDisposable { configuration = config.parse( GrpcClientConfiguration, @@ -124,5 +132,5 @@ export function grpcClient( }, } ); - return createGrpcClient(configuration, certPath, apiKey); + return createGrpcClient(configuration, options); } diff --git a/packages/grpc/src/internal/GrpcInputSource.ts b/packages/grpc/src/internal/GrpcInputSource.ts index fe7c4d18..e62590b3 100644 --- a/packages/grpc/src/internal/GrpcInputSource.ts +++ b/packages/grpc/src/internal/GrpcInputSource.ts @@ -39,7 +39,7 @@ import { GrpcResponseStream, GrpcStreamHandler, } from "."; -import { GrpcMetadata, IGrpcConfiguration, IGrpcServerConfiguration } from ".."; +import { GrpcMetadata, IGrpcConfiguration, IGrpcServerConfiguration, IGrpcServerOptions } from ".."; import { GrpcOpenTracingTagKeys } from "./helper"; enum GrpcMetrics { @@ -59,11 +59,10 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { private logger: ILogger; private tracer: Tracer; private metrics: IMetrics; - private apiKey: string; constructor( private readonly config: IGrpcServerConfiguration & IGrpcConfiguration, - apiKey?: string + private readonly options?: IGrpcServerOptions ) { if (!config.skipNoStreamingValidation) { for (const def of config.definitions) { @@ -84,7 +83,6 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { this.logger = DefaultComponentContext.logger; this.tracer = DefaultComponentContext.tracer; this.metrics = DefaultComponentContext.metrics; - this.apiKey = apiKey; } public async initialize(context: IComponentContext): Promise { @@ -208,7 +206,7 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { path: method.path, }); }); - if (this.apiKey) { + if (this.options?.apiKey) { if (!this.isApiKeyValid(call.metadata)) { await msgRef.release( undefined, @@ -259,10 +257,7 @@ export class GrpcInputSource implements IInputSource, IRequireInitialization { } private isApiKeyValid(meta: Metadata) { - const headerValue = meta.get("custom-auth-header"); - if (!headerValue || headerValue.length < 1 || headerValue[0].toString() !== this.apiKey) { - return false; - } - return true; + const headerValue = meta.get("authorization"); + return headerValue?.[0]?.toString() === this.options.apiKey; } } diff --git a/packages/grpc/src/internal/client.ts b/packages/grpc/src/internal/client.ts index 90d846dd..2799d4c1 100644 --- a/packages/grpc/src/internal/client.ts +++ b/packages/grpc/src/internal/client.ts @@ -30,7 +30,7 @@ import { import { FORMAT_HTTP_HEADERS, Span, SpanContext, Tags, Tracer } from "opentracing"; import { performance } from "perf_hooks"; import { createGrpcConfiguration, createServiceDefinition } from "."; -import { IGrpcClientConfiguration, IGrpcConfiguration } from ".."; +import { IGrpcClientConfiguration, IGrpcClientOptions, IGrpcConfiguration } from ".."; enum GrpcMetrics { RequestSent = "cookie_cutter.grpc_client.request_sent", @@ -41,7 +41,6 @@ enum GrpcMetricResult { Success = "success", Error = "error", } -const DEFAULT_API_KEY = "token"; class ClientBase implements IRequireInitialization, IDisposable { private pendingStreams: Set>; @@ -76,24 +75,27 @@ class ClientBase implements IRequireInitialization, IDisposable { export function createGrpcClient( config: IGrpcClientConfiguration & IGrpcConfiguration, - certPath?: string, - apiKey?: string + options?: IGrpcClientOptions ): T & IDisposable & IRequireInitialization { const serviceDef = createServiceDefinition(config.definition); let client: Client; const ClientType = makeGenericClientConstructor(serviceDef, undefined, undefined); + const certPath = options?.certPath; + const apiKey = options?.apiKey; if (certPath) { const rootCert = readFileSync(certPath); const channelCreds = credentials.createSsl(rootCert); - const metaCallback = (_params: any, callback: (arg0: null, arg1: Metadata) => void) => { - const meta = new Metadata(); - meta.add("custom-auth-header", apiKey || DEFAULT_API_KEY); - callback(null, meta); - }; - - const callCreds = credentials.createFromMetadataGenerator(metaCallback); - const combCreds = credentials.combineChannelCredentials(channelCreds, callCreds); + let combCreds = channelCreds; + if (apiKey) { + const metaCallback = (_params: any, callback: (arg0: null, arg1: Metadata) => void) => { + const meta = new Metadata(); + meta.add("authorization", apiKey); + callback(null, meta); + }; + const callCreds = credentials.createFromMetadataGenerator(metaCallback); + combCreds = credentials.combineChannelCredentials(channelCreds, callCreds); + } client = new ClientType(config.endpoint, combCreds, createGrpcConfiguration(config)); } else { client = new ClientType( @@ -169,7 +171,7 @@ export function createGrpcClient( try { const meta = createTracingMetadata(wrapper.tracer, span); if (!certPath && apiKey) { - meta.set("custom-auth-header", apiKey); + meta.set("authorization", apiKey); } return client.makeServerStreamRequest( method.path, @@ -247,7 +249,7 @@ export function createGrpcClient( return await new Promise((resolve, reject) => { const meta = createTracingMetadata(wrapper.tracer, span); if (!certPath && apiKey) { - meta.set("custom-auth-header", apiKey); + meta.set("authorization", apiKey); } client.makeUnaryRequest( method.path, From d09ea324e218e12048caf087eb61279d54d373e4 Mon Sep 17 00:00:00 2001 From: PLAMEN IVANOV Date: Wed, 29 May 2024 17:31:28 -0400 Subject: [PATCH 3/3] Address more comments --- packages/grpc/src/__test__/grpc.test.ts | 2 +- packages/grpc/src/index.ts | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/grpc/src/__test__/grpc.test.ts b/packages/grpc/src/__test__/grpc.test.ts index b1012bbd..0dc000dc 100644 --- a/packages/grpc/src/__test__/grpc.test.ts +++ b/packages/grpc/src/__test__/grpc.test.ts @@ -107,7 +107,7 @@ function testApp( async function createClient( host?: string, config?: Partial, - options?: IGrpcClientOptions + options?: string | IGrpcClientOptions ): Promise { const client = grpcClient( { diff --git a/packages/grpc/src/index.ts b/packages/grpc/src/index.ts index 4aed284b..708a17d8 100644 --- a/packages/grpc/src/index.ts +++ b/packages/grpc/src/index.ts @@ -112,7 +112,7 @@ export function grpcMsg(operation: IGrpcServiceMethod, request: any): IMessage { export function grpcClient( configuration: IGrpcClientConfiguration & IGrpcConfiguration, - options?: IGrpcClientOptions + certPathOrOptions?: string | IGrpcClientOptions ): T & IRequireInitialization & IDisposable { configuration = config.parse( GrpcClientConfiguration, @@ -132,5 +132,8 @@ export function grpcClient( }, } ); - return createGrpcClient(configuration, options); + if (typeof certPathOrOptions === "string") { + certPathOrOptions = { certPath: certPathOrOptions }; + } + return createGrpcClient(configuration, certPathOrOptions); }