A powerful and flexible Event Bus built on top of RabbitMQ, enabling reliable event-driven communication between multiple Node.js services. This package supports advanced features such as message persistence, retries, dead-letter queues, and idempotency.
- Reliable Event Publishing and Consumption: Ensures message delivery with options for persistence and mandatory routing.
- Flexible Subscription: Supports multiple routing keys and handlers for a single queue.
- Dead Letter Queue (DLQ) Support: Captures failed messages for analysis or retries.
- Retry Mechanism: Automatically retries failed messages with configurable delays.
- Idempotency Handling: Avoids duplicate processing of the same message.
- Dynamic Configuration: Fully customizable message stores, retry logic, and connection options.
npm install @nodesandbox/event-bus
Create an instance of the RabbitMQEventBus
with the necessary configuration.
import { RabbitMQEventBus } from '@nodesandbox/event-bus';
const eventBus = new RabbitMQEventBus({
connection: {
url: 'amqp://localhost:5672',
exchange: 'events',
exchangeType: 'topic',
deadLetterExchange: 'events.dlx',
retryExchange: 'events.retry',
},
producer: {
persistent: true,
mandatory: true,
},
consumer: {
prefetch: 10,
autoAck: false,
},
});
await eventBus.init();
console.log('RabbitMQ Event Bus initialized successfully.');
Publish events to RabbitMQ using the publish
method. Events follow the CloudEvents specification.
const event = {
type: 'order.created',
data: { orderId: '1234', userId: 'user001' },
metadata: {
id: 'event-id-1234',
source: 'order-service',
time: new Date().toISOString(),
},
};
await eventBus.publish(event);
console.log('Event published:', event);
Subscribe to events using the subscribe
method and provide a handler function.
await eventBus.subscribe(
['order.created', 'order.updated'],
async (event) => {
console.log('Received event:', event);
// Handle event
},
{ queue: 'order-queue' } // Optional custom queue name
);
Unsubscribe a consumer using its consumer tag.
await eventBus.unsubscribe('consumer-tag');
console.log('Unsubscribed successfully.');
Gracefully close the Event Bus, including all active connections and channels.
await eventBus.close();
console.log('RabbitMQ Event Bus closed.');
Capture failed messages using DLQ:
await eventBus.subscribe(
['events.dlx'],
async (event) => {
console.warn('Dead Letter Message:', event);
// Handle DLQ message
},
{ queue: 'dlq-queue' }
);
Customize retry delays and maximum attempts:
const eventBus = new RabbitMQEventBus({
connection: {
url: 'amqp://localhost:5672',
exchange: 'events',
},
producer: {
persistent: true,
},
consumer: {
prefetch: 10,
autoAck: false,
},
retryOptions: {
maxRetries: 5,
retryDelays: [1000, 5000, 10000], // Retry after 1s, 5s, and 10s
},
});
The IdempotencyStore
ensures that duplicate messages are not processed:
eventBus.useIdempotencyStore(new CustomIdempotencyStore());
class CustomIdempotencyStore {
private processedMessages = new Map();
async markAsProcessed(messageId, result) {
this.processedMessages.set(messageId, { result });
}
async hasBeenProcessed(messageId) {
return this.processedMessages.has(messageId);
}
}
Option | Description | Default |
---|---|---|
connection.url |
The RabbitMQ connection URL. | amqp://localhost:5672 |
connection.exchange |
Name of the exchange. | events |
connection.exchangeType |
Type of the exchange (topic , direct , fanout ). |
topic |
producer.persistent |
Ensure messages are persistent. | true |
producer.mandatory |
Return unroutable messages to the publisher. | true |
consumer.prefetch |
Number of messages to prefetch. | 10 |
consumer.autoAck |
Automatically acknowledge messages. | false |
Publishes an event to RabbitMQ.
- event: The CloudEvent to be published.
- options: Additional AMQP options (e.g., headers, expiration).
Subscribes to one or more event types.
- eventTypes: An array of event types or a single event type.
- handler: A function to handle incoming events.
- options.queue: The name of the queue.
Unsubscribes a consumer by its tag.
Closes the connection and all active channels.
- Use Persistent Messages: Ensure critical messages are not lost by enabling persistence.
- Implement DLQs: Capture and analyze failed messages for debugging or retrying.
- Use Idempotency: Prevent duplicate processing of messages.
- Monitor RabbitMQ: Leverage RabbitMQ's monitoring tools to track queue states and message flow.
To see the package in action, we created a sample project simulating an e-commerce system with multiple microservices communicating via this Event Bus.
-
Order Service:
- Publishes events related to order creation (
order.created
) and updates. - Interacts with Inventory and Payment Services to coordinate order processing.
- Publishes events related to order creation (
-
Inventory Service:
- Subscribes to stock-related events like
stock.check
andstock.reserved
. - Publishes responses (
stock.check.response
) and manages inventory updates.
- Subscribes to stock-related events like
-
Payment Service:
- Handles payment-related events such as
payment.initiated
and publishes results (payment.succeeded
orpayment.failed
).
- Handles payment-related events such as
-
Notification Service:
- Subscribes to multiple events (
order.created
,order.completed
,payment.succeeded
, etc.) to send notifications.
- Subscribes to multiple events (
Each service is a separate Node.js server using this package for communication. The project demonstrates reliable event flow, retries, and error handling.
-
Clone the sample project repository:
git clone https://github.com/nodesandbox/event-bus-sample.git cd event-bus-sample
-
Install dependencies:
npm install
-
Start RabbitMQ: Ensure RabbitMQ is running, either locally or via Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
-
Start the services: Launch each service in a separate terminal:
# Order Service cd order-service npm start # Inventory Service cd inventory-service npm start # Payment Service cd payment-service npm start # Notification Service cd notification-service npm start
-
Test the system: Use Postman, Curl, or any HTTP client to simulate creating an order:
curl -X POST http://localhost:3001/orders \ -H "Content-Type: application/json" \ -d '{ "userId": "user123", "items": [ { "productId": "PROD1", "quantity": 2, "price": 19.99 }, { "productId": "PROD2", "quantity": 1, "price": 9.99 } ] }'
Check the logs of all services to observe event publication and handling.
-
Monitor with RabbitMQ Management UI: Access the RabbitMQ management interface at http://localhost:15672/. Use
guest/guest
as login credentials to view exchanges, queues, and messages.
Find the full source code for the sample project here:
https://github.com/nodesandbox/event-bus-sample
We welcome contributions! Please follow these steps:
- Fork the repository.
- Create a new branch.
- Submit a pull request with your changes.
This package is licensed under the MIT License. See LICENSE for more details.