Skip to content

Commit

Permalink
feat(app): import process records instead of deployments
Browse files Browse the repository at this point in the history
* the new process records contains all required data
* import only events - commands and rejects are not needed
  • Loading branch information
saig0 committed May 14, 2021
1 parent 88cd818 commit f0e8f84
Showing 1 changed file with 34 additions and 38 deletions.
72 changes: 34 additions & 38 deletions src/main/java/io/zeebe/monitor/zeebe/ZeebeImportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.hazelcast.core.HazelcastInstance;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
Expand Down Expand Up @@ -75,27 +74,36 @@ public ZeebeHazelcast importFrom(final HazelcastInstance hazelcast) {

final var builder =
ZeebeHazelcast.newBuilder(hazelcast)
.addDeploymentListener(
record ->
withKey(record, Schema.DeploymentRecord::getMetadata, this::importDeployment))
.addProcessListener(
record -> ifEvent(record, Schema.ProcessRecord::getMetadata, this::importProcess))
.addProcessInstanceListener(
record ->
withKey(
ifEvent(
record,
Schema.ProcessInstanceRecord::getMetadata,
this::importProcessInstance))
.addIncidentListener(
record -> withKey(record, Schema.IncidentRecord::getMetadata, this::importIncident))
record -> ifEvent(record, Schema.IncidentRecord::getMetadata, this::importIncident))
.addJobListener(
record -> withKey(record, Schema.JobRecord::getMetadata, this::importJob))
record -> ifEvent(record, Schema.JobRecord::getMetadata, this::importJob))
.addVariableListener(
record -> withKey(record, Schema.VariableRecord::getMetadata, this::importVariable))
record -> ifEvent(record, Schema.VariableRecord::getMetadata, this::importVariable))
.addTimerListener(
record -> withKey(record, Schema.TimerRecord::getMetadata, this::importTimer))
record -> ifEvent(record, Schema.TimerRecord::getMetadata, this::importTimer))
.addMessageListener(
record -> withKey(record, Schema.MessageRecord::getMetadata, this::importMessage))
.addMessageSubscriptionListener(this::importMessageSubscription)
.addMessageStartEventSubscriptionListener(this::importMessageStartEventSubscription)
record -> ifEvent(record, Schema.MessageRecord::getMetadata, this::importMessage))
.addMessageSubscriptionListener(
record ->
ifEvent(
record,
Schema.MessageSubscriptionRecord::getMetadata,
this::importMessageSubscription))
.addMessageStartEventSubscriptionListener(
record ->
ifEvent(
record,
Schema.MessageStartEventSubscriptionRecord::getMetadata,
this::importMessageStartEventSubscription))
.addErrorListener(this::importError)
.postProcessListener(
sequence -> {
Expand All @@ -112,47 +120,35 @@ record -> withKey(record, Schema.MessageRecord::getMetadata, this::importMessage
return builder.build();
}

private <T> void withKey(
private <T> void ifEvent(
final T record,
final Function<T, Schema.RecordMetadata> extractor,
final Consumer<T> consumer) {
final var metadata = extractor.apply(record);
if (!hasKey(metadata)) {
if (isEvent(metadata)) {
consumer.accept(record);
}
}

private boolean hasKey(final Schema.RecordMetadata metadata) {
return metadata.getKey() < 0;
private boolean isEvent(final Schema.RecordMetadata metadata) {
return metadata.getRecordType() == Schema.RecordMetadata.RecordType.EVENT;
}

private void importDeployment(final Schema.DeploymentRecord record) {

final DeploymentIntent intent = DeploymentIntent.valueOf(record.getMetadata().getIntent());
private void importProcess(final Schema.ProcessRecord record) {
final int partitionId = record.getMetadata().getPartitionId();

if (intent != DeploymentIntent.CREATED || partitionId != Protocol.DEPLOYMENT_PARTITION) {
// ignore deployment event on other partitions to avoid duplicates
if (partitionId != Protocol.DEPLOYMENT_PARTITION) {
// ignore process event on other partitions to avoid duplicates
return;
}

record
.getResourcesList()
.forEach(
resource -> {
record.getProcessMetadataList().stream()
.filter(w -> w.getResourceName().equals(resource.getResourceName()))
.forEach(
processMetadata -> {
final ProcessEntity entity = new ProcessEntity();
entity.setKey(processMetadata.getProcessDefinitionKey());
entity.setBpmnProcessId(processMetadata.getBpmnProcessId());
entity.setVersion(processMetadata.getVersion());
entity.setResource(resource.getResource().toStringUtf8());
entity.setTimestamp(record.getMetadata().getTimestamp());
processRepository.save(entity);
});
});
final ProcessEntity entity = new ProcessEntity();
entity.setKey(record.getProcessDefinitionKey());
entity.setBpmnProcessId(record.getBpmnProcessId());
entity.setVersion(record.getVersion());
entity.setResource(record.getResource().toStringUtf8());
entity.setTimestamp(record.getMetadata().getTimestamp());
processRepository.save(entity);
}

private void importProcessInstance(final Schema.ProcessInstanceRecord record) {
Expand Down

0 comments on commit f0e8f84

Please sign in to comment.