Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Redis Importer #656

Merged
merged 4 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A monitoring application for [Zeebe](https://zeebe.io). It is designed for devel
* test workflows manually
* provide insides on how workflows are executed

The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter) or [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages.
The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter), [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter) or [Redis exporter](https://github.com/camunda-community-hub/zeebe-redis-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages.

![how-it-works](docs/how-it-works.png)

Expand Down Expand Up @@ -59,21 +59,35 @@ By default, the Zeebe Simple Monitor imports Zeebe events through Hazelcast, but
* In order to import events efficiently and quickly, Zeebe brokers partitions and Kafka topic partitions should be correlated in a special way: [reference to the exporter docs](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#partitioning)
* Configure the environment variables in the Zeebe Simple Monitor as described in the "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)" section

**Switch to the Redis exporter/importer**

* Ensure that a Zeebe broker is running with a [Redis exporter](https://github.com/camunda-community-hub/zeebe-redis-exporter)
* Adjust the following environment variables in Zeebe:
```
- ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379
- ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900
- ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE=true
```
* Configure the connection to the Zeebe broker by setting `zeebe.client.broker.gateway-address` (default: `localhost:26500`)
* Configure the connection to Redis by setting `zeebe.client.worker.redis.connection` (default: `redis://localhost:6379`)
* Activate Redis by setting `zeebe-importer: redis`


If the Zeebe broker runs on your local machine with the default configs then start the container with the following command:

```
docker run --network="host" ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1
```

For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka exporter and the application.
For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka/Redis exporter and the application.
There are several Docker Compose profiles, setting by a file [.env](docker/.env), by passing multiple --profile flags or a comma-separated list for the COMPOSE_PROFILES environment variable:
* ```docker compose --profile hazelcast --profile hazelcast_in_memory up```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory docker compose up```

Existing presets:
* ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory``` (by default)
* ```COMPOSE_PROFILES=kafka,kafka_in_memory```
* ```COMPOSE_PROFILES=redis,redis_in_memory```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_postgres,postgres```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_mysql,mysql```

Expand All @@ -89,6 +103,7 @@ Go to http://localhost:8082
To change the database see "[Change the Database](#change-the-database)"

To change Zeebe importer see "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)"
or "[Change the default Zeebe importer to Redis](#change-the-default-zeebe-importer-to-redis)"

```
docker-compose --profile postgres up
Expand Down Expand Up @@ -252,7 +267,29 @@ See the [docker-compose file](docker/docker-compose.yml) for a sample configurat
* `spring.kafka.custom.concurrency` (default: `3`) is the number of threads for the Kafka listener that will import events from Zeebe
* `spring.kafka.custom.retry.intervalMs` (default: `30000`) and `spring.kafka.custom.retry.max-attempts` (default: `3`) are the retry configurations for a retryable exception in the listener

Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profiles presets: `kafka,kafka_in_memory`
Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profile presets: `kafka,kafka_in_memory`

#### Change the default Zeebe importer to Redis

* set the `zeebe-importer` (default: `hazelcast`) configuration property to `redis`
* adjust the importer settings under `zeebe.client.worker.redis` (complete default values below):
```
zeebe:
client:
broker.gatewayAddress: 127.0.0.1:26500
security.plaintext: true
worker:
redis:
connection: redis://localhost:6379
consumer-group: simple-monitor
xread-count: 500
xread-block-millis: 2000
zeebe-importer: redis
```

Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Redis importer. Profile presets: `redis,redis_in_memory`

## Code of Conduct

Expand Down
43 changes: 43 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ services:
profiles:
- kafka

zeebe-redis:
container_name: zeebe-broker-redis
image: ghcr.io/camunda-community-hub/zeebe-with-redis-exporter:8.4.0
environment:
- ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379
- ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900
- ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE=true
ports:
- "26500:26500"
- "9600:9600"
networks:
- zeebe_network
volumes:
- ./redis/application.yaml:/usr/local/zeebe/config/application.yaml
profiles:
- redis

zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
Expand Down Expand Up @@ -73,6 +90,16 @@ services:
profiles:
- kafka

redis:
container_name: redis_cache
image: redis:7-alpine
ports:
- "6379:6379"
networks:
- zeebe_network
profiles:
- redis

simple-monitor-in-memory:
container_name: zeebe-simple-monitor-in-memory
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
Expand Down Expand Up @@ -106,6 +133,22 @@ services:
profiles:
- kafka_in_memory

simple-monitor-in-memory-redis:
container_name: zeebe-simple-monitor-in-memory-redis
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.6.4
environment:
- zeebe.client.broker.gateway-address=zeebe:26500
- zeebe-importer=redis
- zeebe.client.worker.redis.connection=redis://redis:6379
ports:
- "8082:8082"
depends_on:
- zeebe-redis
networks:
- zeebe_network
profiles:
- redis_in_memory

simple-monitor-postgres:
container_name: zeebe-simple-monitor-postgres
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
Expand Down
6 changes: 6 additions & 0 deletions docker/redis/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
zeebe:
broker:
exporters:
redis:
className: io.zeebe.redis.exporter.RedisExporter
jarPath: exporters/zeebe-redis-exporter-jar-with-dependencies.jar
Binary file modified docs/how-it-works.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<zeebe.version>8.3.4</zeebe.version>
<version.zeebe.spring>8.4.0</version.zeebe.spring>
<hazelcast.exporter.version>1.4.0</hazelcast.exporter.version>
<redis.exporter.version>0.9.9</redis.exporter.version>

<querydsl.version>5.0.0</querydsl.version>

Expand Down Expand Up @@ -73,6 +74,12 @@
<version>${hazelcast.exporter.version}</version>
</dependency>

<dependency>
<groupId>io.zeebe.redis</groupId>
<artifactId>zeebe-redis-connector</artifactId>
<version>${redis.exporter.version}</version>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down Expand Up @@ -142,6 +149,11 @@
<artifactId>zeebe-hazelcast-connector</artifactId>
</dependency>

<dependency>
<groupId>io.zeebe.redis</groupId>
<artifactId>zeebe-redis-connector</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-protocol-jackson</artifactId>
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/io/zeebe/monitor/config/RedisConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.zeebe.monitor.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;

@ConditionalOnProperty(name = "zeebe-importer", havingValue = "redis")
@Configuration
public class RedisConfig {

@Value("${zeebe.client.worker.redis.connection}")
private String redisConnection;

@Value("${zeebe.client.worker.redis.consumer-group:simple-monitor}")
private String redisConumerGroup;

@Value("${zeebe.client.worker.redis.xread-count:500}")
private int redisXreadCount;

@Value("${zeebe.client.worker.redis.xread-block-millis:2000}")
private int redisXreadBlockMillis;

public String getRedisConnection() {
return redisConnection;
}

public String getRedisConumerGroup() {
return redisConumerGroup;
}

public int getRedisXreadCount() {
return redisXreadCount;
}

public int getRedisXreadBlockMillis() {
return redisXreadBlockMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
import io.zeebe.monitor.zeebe.hazelcast.importers.ErrorHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.IncidentHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.JobHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.MessageHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.MessageSubscriptionHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.ProcessAndElementHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.TimerHazelcastImporter;
import io.zeebe.monitor.zeebe.hazelcast.importers.VariableHazelcastImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.ErrorProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.IncidentProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.JobProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.MessageProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.MessageSubscriptionProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.ProcessAndElementProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.TimerProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.VariableProtobufImporter;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -21,14 +21,14 @@
@Component
public class HazelcastImportService {

@Autowired private ProcessAndElementHazelcastImporter processAndElementImporter;
@Autowired private VariableHazelcastImporter variableImporter;
@Autowired private JobHazelcastImporter jobImporter;
@Autowired private IncidentHazelcastImporter incidentImporter;
@Autowired private MessageHazelcastImporter messageImporter;
@Autowired private MessageSubscriptionHazelcastImporter messageSubscriptionImporter;
@Autowired private TimerHazelcastImporter timerImporter;
@Autowired private ErrorHazelcastImporter errorImporter;
@Autowired private ProcessAndElementProtobufImporter processAndElementImporter;
@Autowired private VariableProtobufImporter variableImporter;
@Autowired private JobProtobufImporter jobImporter;
@Autowired private IncidentProtobufImporter incidentImporter;
@Autowired private MessageProtobufImporter messageImporter;
@Autowired private MessageSubscriptionProtobufImporter messageSubscriptionImporter;
@Autowired private TimerProtobufImporter timerImporter;
@Autowired private ErrorProtobufImporter errorImporter;

@Autowired private HazelcastConfigRepository hazelcastConfigRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.ErrorEntity;
Expand All @@ -7,7 +7,7 @@
import org.springframework.stereotype.Component;

@Component
public class ErrorHazelcastImporter {
public class ErrorProtobufImporter {

@Autowired private ErrorRepository errorRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.zeebe.exporter.proto.Schema;
Expand All @@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;

@Component
public class IncidentHazelcastImporter {
public class IncidentProtobufImporter {

@Autowired private IncidentRepository incidentRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.exporter.proto.Schema;
Expand All @@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;

@Component
public class JobHazelcastImporter {
public class JobProtobufImporter {

@Autowired private JobRepository jobRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.exporter.proto.Schema;
Expand All @@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;

@Component
public class MessageHazelcastImporter {
public class MessageProtobufImporter {

@Autowired private MessageRepository messageRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
Expand All @@ -10,7 +10,7 @@
import org.springframework.stereotype.Component;

@Component
public class MessageSubscriptionHazelcastImporter {
public class MessageSubscriptionProtobufImporter {

@Autowired private MessageSubscriptionRepository messageSubscriptionRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand All @@ -15,7 +15,7 @@
import org.springframework.stereotype.Component;

@Component
public class ProcessAndElementHazelcastImporter {
public class ProcessAndElementProtobufImporter {

@Autowired private ProcessRepository processRepository;
@Autowired private ProcessInstanceRepository processInstanceRepository;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.zeebe.exporter.proto.Schema;
Expand All @@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;

@Component
public class TimerHazelcastImporter {
public class TimerProtobufImporter {

@Autowired private TimerRepository timerRepository;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.zeebe.monitor.zeebe.hazelcast.importers;
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.VariableEntity;
Expand All @@ -7,7 +7,7 @@
import org.springframework.stereotype.Component;

@Component
public class VariableHazelcastImporter {
public class VariableProtobufImporter {

@Autowired private VariableRepository variableRepository;

Expand Down
Loading