Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added kafka trigger #225

Open
wants to merge 1 commit into
base: v4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
HttpHandler,
HttpMethod,
HttpMethodFunctionOptions,
KafkaFunctionOptions,
ServiceBusQueueFunctionOptions,
ServiceBusTopicFunctionOptions,
StorageBlobFunctionOptions,
Expand Down Expand Up @@ -124,6 +125,10 @@ export function cosmosDB(name: string, options: CosmosDBFunctionOptions): void {
generic(name, convertToGenericOptions(options, <any>trigger.cosmosDB));
}

export function kafka(name: string, options: KafkaFunctionOptions): void {
generic(name, convertToGenericOptions(options, trigger.kafka));
}

export function warmup(name: string, options: WarmupFunctionOptions): void {
generic(name, convertToGenericOptions(options, trigger.warmup));
}
Expand Down
9 changes: 9 additions & 0 deletions src/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
GenericTriggerOptions,
HttpTrigger,
HttpTriggerOptions,
KafkaTrigger,
KafkaTriggerOptions,
ServiceBusQueueTrigger,
ServiceBusQueueTriggerOptions,
ServiceBusTopicTrigger,
Expand Down Expand Up @@ -92,6 +94,13 @@ export function cosmosDB(options: CosmosDBTriggerOptions): CosmosDBTrigger {
});
}

export function kafka(options: KafkaTriggerOptions): KafkaTrigger {
return addTriggerBindingName({
...options,
type: 'kafkaTrigger',
});
}

export function warmup(options: WarmupTriggerOptions): WarmupTrigger {
return addTriggerBindingName({
...options,
Expand Down
8 changes: 8 additions & 0 deletions types/app.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EventGridFunctionOptions } from './eventGrid';
import { EventHubFunctionOptions } from './eventHub';
import { GenericFunctionOptions } from './generic';
import { HttpFunctionOptions, HttpHandler, HttpMethodFunctionOptions } from './http';
import { KafkaFunctionOptions } from './kafka';
import { ServiceBusQueueFunctionOptions, ServiceBusTopicFunctionOptions } from './serviceBus';
import { SetupOptions } from './setup';
import { StorageBlobFunctionOptions, StorageQueueFunctionOptions } from './storage';
Expand Down Expand Up @@ -151,6 +152,13 @@ export function eventGrid(name: string, options: EventGridFunctionOptions): void
*/
export function cosmosDB(name: string, options: CosmosDBFunctionOptions): void;

/**
* Registers a kafka function in your app that will be triggered whenever a message is added to a kafka topic
* @param name The name of the function. The name must be unique within your app and will mostly be used for your own tracking purposes
* @param options Configuration options describing the inputs, outputs, and handler for this function
*/
export function kafka(name: string, options: KafkaFunctionOptions): void;

/**
* Registers a function in your app that will be triggered when an instance is added to scale a running function app.
* The warmup trigger is only called during scale-out operations, not during restarts or other non-scale startups.
Expand Down
1 change: 1 addition & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export * from './hooks/invocationHooks';
export * from './http';
export * as input from './input';
export * from './InvocationContext';
export * from './kafka';
export * as output from './output';
export * from './serviceBus';
export * from './setup';
Expand Down
92 changes: 92 additions & 0 deletions types/kafka.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { FunctionOptions, FunctionResult, FunctionTrigger, RetryOptions } from './index';
import { InvocationContext } from './InvocationContext';

export type KafkaHandler = (messages: unknown, context: InvocationContext) => FunctionResult;

export interface KafkaFunctionOptions extends KafkaTriggerOptions, Partial<FunctionOptions> {
handler: KafkaHandler;

trigger?: KafkaTrigger;

/**
* An optional retry policy to rerun a failed execution until either successful completion occurs or the maximum number of retries is reached.
* Learn more [here](https://learn.microsoft.com/azure/azure-functions/functions-bindings-error-pages)
*/
retry?: RetryOptions;
}

export interface KafkaTriggerOptions {
/**
* The list of Kafka brokers monitored by the trigger.
*/
brokerList: [];

/**
* The topic monitored by the trigger.
*/
topic: string;

/**
* Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
*/
cardinality?: 'ONE' | 'MANY';

/**
* Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
*/
dataType?: 'string' | 'binary';

/**
* Kafka consumer group used by the trigger.
*/
consumerGroup?: string;

/**
* Schema of a generic record when using the Avro protocol.
*/
avroSchema?: string;

/**
* The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi, Plain (default), ScramSha256, ScramSha512.
*/
authenticationMode?: 'Plain' | 'Gssapi' | 'ScramSha256' | 'ScramSha512';

/**
* The username for SASL authentication. Not supported when AuthenticationMode is Gssapi.
*/
username?: string;

/**
* The password for SASL authentication. Not supported when AuthenticationMode is Gssapi.
*/
password?: string;

/**
* The security protocol used when communicating with brokers. The supported values are plaintext (default), ssl, sasl_plaintext, sasl_ssl.
*/
protocol?: 'plaintext' | 'ssl' | 'sasl_plaintext' | 'sasl_ssl';

/**
* Path to CA certificate file for verifying the broker's certificate.
*/
sslCaLocation?: string;

/**
* Path to client's certificate file.
*/
sslCertificateLocation?: string;

/**
* Path to client's private key (PEM) used for authentication.
*/
sslKeyLocation?: string;

/**
* Password for client's certificate.
*/
sslKeyPassword?: string;
}
export type KafkaTrigger = FunctionTrigger & KafkaTriggerOptions;
6 changes: 6 additions & 0 deletions types/trigger.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { EventHubTrigger, EventHubTriggerOptions } from './eventHub';
import { GenericTriggerOptions } from './generic';
import { HttpTrigger, HttpTriggerOptions } from './http';
import { FunctionTrigger } from './index';
import { KafkaTrigger, KafkaTriggerOptions } from './kafka';
import {
ServiceBusQueueTrigger,
ServiceBusQueueTriggerOptions,
Expand Down Expand Up @@ -67,6 +68,11 @@ export function eventGrid(options: EventGridTriggerOptions): EventGridTrigger;
*/
export function cosmosDB(options: CosmosDBTriggerOptions): CosmosDBTrigger;

/**
* [Link to docs and examples](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka-trigger&pivots=programming-language-javascript)
*/
export function kafka(options: KafkaTriggerOptions): KafkaTrigger;

/**
* [Link to docs and examples](https://learn.microsoft.com/azure/azure-functions/functions-bindings-warmup?tabs=isolated-process&pivots=programming-language-javascript)
*/
Expand Down