Skip to content

Commit

Permalink
feat(kafka): implements parser for kafka
Browse files Browse the repository at this point in the history
There is currently a bug with the integration test for Kafka due to a heartbeat still firing after
disconnection. There is an [open issue](nestjs/nest#4830) in Nest's repo
about this at the moment.

fix #17
  • Loading branch information
jmcdo29 committed May 24, 2020
1 parent 03dcc0b commit 4ce590f
Show file tree
Hide file tree
Showing 21 changed files with 299 additions and 3 deletions.
26 changes: 25 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,31 @@ services:
container_name: rabbit
ports:
- "5672:5672"
# kafka:
zookeeper:
container_name: kafka-zookeeper
hostname: zookeeper
image: confluentinc/cp-zookeeper:5.3.2
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
container_name: kafka
hostname: kafka
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# grpc:
nats:
image: nats
Expand Down
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
10 changes: 10 additions & 0 deletions integration/src/kafka/client/exception.filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { ArgumentsHost, Catch, HttpException } from '@nestjs/common';
import { BaseExceptionFilter } from '@nestjs/core';

@Catch()
export class ExceptionFilter extends BaseExceptionFilter {
catch(exception: HttpException, host: ArgumentsHost) {
const res = host.switchToHttp().getResponse();
res.send(exception);
}
}
41 changes: 41 additions & 0 deletions integration/src/kafka/client/kafka-client.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
Controller,
Get,
Inject,
OnModuleDestroy,
OnModuleInit,
UseFilters,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ExceptionFilter } from './exception.filter';

@Controller()
export class KafkaClientController implements OnModuleInit, OnModuleDestroy {
constructor(@Inject('KAFKA_SERVICE') private readonly kafka: ClientKafka) {}

async onModuleInit() {
['hello', 'error', 'skip'].forEach((key) =>
this.kafka.subscribeToResponseOf(`say.${key}`),
);
}

onModuleDestroy() {
this.kafka.close();
}

@Get()
sayHello() {
return this.kafka.send('say.hello', { ip: '127.0.0.1' });
}

@Get('error')
@UseFilters(ExceptionFilter)
sayError() {
return this.kafka.send('say.error', { ip: '127.0.0.1' });
}

@Get('skip')
saySkip() {
return this.kafka.send('say.skip', { ip: '127.0.0.1' });
}
}
25 changes: 25 additions & 0 deletions integration/src/kafka/client/kafka-client.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { KafkaClientController } from './kafka-client.controller';

@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'client',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'client-consumer',
},
},
},
]),
],
controllers: [KafkaClientController],
})
export class KafkaClientModule {}
10 changes: 10 additions & 0 deletions integration/src/kafka/server/exception.filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Catch, HttpException } from '@nestjs/common';
import { BaseRpcExceptionFilter } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';

@Catch()
export class ExceptionFilter extends BaseRpcExceptionFilter {
catch(exception: HttpException): Observable<any> {
return throwError(exception.message);
}
}
27 changes: 27 additions & 0 deletions integration/src/kafka/server/kafka-server.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { BadRequestException, Controller, UseFilters } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { OgmaSkip } from '@ogma/nestjs-module';
import { AppService } from '../../app.service';
import { ExceptionFilter } from './exception.filter';

@Controller()
export class KafkaServerController {
constructor(private readonly service: AppService) {}

@MessagePattern('say.hello')
sayHello() {
return this.service.getHello();
}

@UseFilters(ExceptionFilter)
@MessagePattern('say.error')
sayError() {
throw new BadRequestException('Borked');
}

@OgmaSkip()
@MessagePattern('say.skip')
saySkip() {
return this.service.getHello();
}
}
9 changes: 9 additions & 0 deletions integration/src/kafka/server/kafka-server.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { AppService } from '../../app.service';
import { KafkaServerController } from './kafka-server.controller';

@Module({
controllers: [KafkaServerController],
providers: [AppService],
})
export class KafkaServerModule {}
3 changes: 3 additions & 0 deletions integration/test/grpc.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
describe('gRPC test', () => {
it.todo('implement test');
});
93 changes: 93 additions & 0 deletions integration/test/kafka.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { INestApplication, INestMicroservice } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { color } from '@ogma/logger';
import { OgmaInterceptor, OgmaModule } from '@ogma/nestjs-module';
import { KafkaParser } from '@ogma/platform-kafka';
import { KafkaClientModule } from '../src/kafka/client/kafka-client.module';
import { KafkaServerModule } from '../src/kafka/server/kafka-server.module';
import { getInterceptor, hello, httpPromise } from './utils';

describe('kafka test', () => {
let interceptor: OgmaInterceptor;
let server: INestMicroservice;
let client: INestApplication;

beforeAll(async () => {
const serverModRef = await Test.createTestingModule({
imports: [
KafkaServerModule,
OgmaModule.forRoot({
interceptor: {
rpc: KafkaParser,
},
}),
],
}).compile();
server = serverModRef.createNestMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
});
interceptor = getInterceptor(server);
await server.listenAsync();
const clientModRef = await Test.createTestingModule({
imports: [KafkaClientModule],
}).compile();
client = clientModRef.createNestApplication();
await client.listen(0);
});

afterAll(async () => {
await client.close();
await server.close();
});

describe('server calls', () => {
let logSpy: jest.SpyInstance;
let baseUrl: string;

beforeAll(async () => {
baseUrl = await client.getUrl();
});

beforeEach(() => {
logSpy = jest.spyOn(interceptor, 'log');
});

afterEach(() => {
logSpy.mockClear();
});

it.each`
url | status | endpoint
${'/'} | ${color.green(200)} | ${'say.hello'}
${'/error'} | ${color.red(500)} | ${'say.error'}
`(
'$url call',
async ({
url,
status,
endpoint,
}: {
url: string;
status: string;
endpoint: string;
}) => {
await httpPromise(baseUrl + url);
expect(logSpy).toBeCalledTimes(1);
const logObject = logSpy.mock.calls[0][0];
expect(logObject).toBeALogObject('Kafka', endpoint, 'kafka', status);
},
);

it('should call the skip but not log it', async () => {
const data = await httpPromise(baseUrl + '/skip');
expect(logSpy).toHaveBeenCalledTimes(0);
expect(data).toEqual(hello);
});
});
});
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"jest": "^25.1.0",
"lerna": "^3.20.2",
"lint-staged": "^10.0.8",
"kafkajs": "^1.12.0",
"morgan": "^1.10.0",
"mqtt": "^4.0.1",
"nats": "^1.4.9",
Expand Down
9 changes: 7 additions & 2 deletions packages/platform-kafka/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
"url": "git+https://github.com/jmcdo29/ogma.git"
},
"scripts": {
"test": "echo \"No tests to run\""
"prebuild": "rimraf lib",
"build": "tsc -p tsconfig.build.json",
"postbuild": "mv ./lib/src/* ./lib && rmdir lib/src",
"test": "jest",
"test:cov": "jest --coverage"
},
"bugs": {
"url": "https://github.com/jmcdo29/ogma/issues"
Expand All @@ -39,6 +43,7 @@
"rxjs": "^6.5.4"
},
"peerDependencies": {
"@ogma/nestjs-module": "^0.1.0"
"@ogma/nestjs-module": "^0.1.0",
"kafkajs": "^1.12.0"
}
}
1 change: 1 addition & 0 deletions packages/platform-kafka/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './kafka-interceptor.service';
40 changes: 40 additions & 0 deletions packages/platform-kafka/src/kafka-interceptor.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { ExecutionContext, Injectable } from '@nestjs/common';
import { KafkaContext } from '@nestjs/microservices';
import { AbstractInterceptorService } from '@ogma/nestjs-module';

@Injectable()
export class KafkaParser extends AbstractInterceptorService {
getCallPoint(context: ExecutionContext) {
return this.getClient(context).getTopic();
}

getCallerIp(context: ExecutionContext) {
const data = this.getData(context);
return data?.value?.ip || '';
}

getMethod() {
return 'Kafka';
}

getProtocol() {
return 'kafka';
}

getStatus(
context: ExecutionContext,
inColor: boolean,
error?: Error | ExecutionContext,
): string {
const status = error ? 500 : 200;
return inColor ? this.wrapInColor(status) : status.toString();
}

private getClient(context: ExecutionContext): KafkaContext {
return context.switchToRpc().getContext<KafkaContext>();
}

private getData(context: ExecutionContext): any {
return context.switchToRpc().getData();
}
}
Empty file.
7 changes: 7 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7760,6 +7760,13 @@ jsprim@^1.2.2:
json-schema "0.2.3"
verror "1.10.0"

kafkajs@^1.12.0:
version "1.12.0"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-1.12.0.tgz#50ad336baee95f3324af8ae8df6fadc96e07c613"
integrity sha512-Izkd9iFRgeeKaHEgVpGQH08ygzCbHSxTbnu8W3G3uiNaVjGibUTmTwjv1Qf2M8NORXcPfzwVyg6bBlVj4SKr9g==
dependencies:
long "^4.0.0"

kind-of@^3.0.2, kind-of@^3.0.3, kind-of@^3.2.0:
version "3.2.2"
resolved "https://registry.yarnpkg.com/kind-of/-/kind-of-3.2.2.tgz#31ea21a734bab9bbb0f32466d893aea51e4a3c64"
Expand Down

0 comments on commit 4ce590f

Please sign in to comment.