Skip to content

Commit

Permalink
Merge branch '7.4.x' into 7.5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Sep 7, 2023
2 parents 4419c26 + 0c179e6 commit 3ebda26
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 21 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/>

<suppress checks="JavaNCSS"
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/>
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/>

<suppress checks="MethodLength"
files="(AvroData|ProtobufSchema|ProtobufSchemaUtils).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,16 @@ public Object transform(RuleContext ctx, Object message) throws RuleException {
input = message;
}
Object result = execute(ctx, input, ImmutableMap.of("message", input));
if (ctx.rule().getKind() == RuleKind.CONDITION) {
if (Boolean.TRUE.equals(result)) {
return message;
} else {
throw new RuleException("Expr failed: '" + ctx.rule().getExpr() + "'");
}
} else {
if (result instanceof Map) {
// Convert maps to the target object type
try {
JsonNode jsonNode = mapper.valueToTree(result);
result = ctx.target().fromJson(jsonNode);
} catch (IOException e) {
throw new RuleException(e);
}
if (result instanceof Map) {
// Convert maps to the target object type
try {
JsonNode jsonNode = mapper.valueToTree(result);
result = ctx.target().fromJson(jsonNode);
} catch (IOException e) {
throw new RuleException(e);
}
return result;
}
return result;
}

protected Object execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public JsonataExecutorTest() {
defaultConfig.put(KafkaAvroSerializerConfig.LATEST_COMPATIBILITY_STRICT, "false");
defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS, "jsonata");
defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS + ".jsonata.class",
JsonataExecutor.class.getName());
JsonataExecutor.class);
avroSerializer = new KafkaAvroSerializer(schemaRegistry, defaultConfig);
Map<String, Object> defaultConfig2 = new HashMap<>(defaultConfig);
defaultConfig2.put(KafkaAvroSerializerConfig.USE_LATEST_WITH_METADATA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ private Stream<RuleBase> initRuleObject(
if (propertyValue == null) {
return Stream.empty();
}
String className = propertyValue.toString();
try {
RuleBase ruleObject = Utils.newInstance(className, RuleBase.class);
RuleBase ruleObject = propertyValue instanceof Class
? Utils.newInstance((Class<?>) propertyValue, RuleBase.class)
: Utils.newInstance(propertyValue.toString(), RuleBase.class);
configureRuleObject(ruleObject, name, config, configName);
return Stream.of(ruleObject);
} catch (ClassNotFoundException e) {
Expand Down Expand Up @@ -675,7 +676,23 @@ protected Object executeRules(
RuleExecutor ruleExecutor = getRuleExecutor(ctx);
if (ruleExecutor != null) {
try {
message = ruleExecutor.transform(ctx, message);
Object result = ruleExecutor.transform(ctx, message);
switch (rule.getKind()) {
case CONDITION:
if (Boolean.FALSE.equals(result)) {
String expr = rule.getExpr();
String errMsg = expr != null
? "Expr failed: '" + expr + "'"
: "Condition failed: '" + rule.getName() + "'";
throw new RuleException(errMsg);
}
break;
case TRANSFORM:
message = result;
break;
default:
throw new IllegalStateException("Unsupported rule kind " + rule.getKind());
}
runAction(ctx, ruleMode, rule,
message != null ? rule.getOnSuccess() : rule.getOnFailure(),
message, null, message != null ? null : ErrorAction.TYPE
Expand Down Expand Up @@ -719,6 +736,10 @@ private void runAction(RuleContext ctx, RuleMode ruleMode, Rule rule, String act
}
if (actionName != null) {
RuleAction ruleAction = getRuleAction(ctx, actionName);
if (ruleAction == null) {
log.error("Could not find rule action of type {}", actionName);
throw new ConfigException("Could not find rule action of type " + actionName);
}
try {
ruleAction.run(ctx, message, ex);
} catch (RuleException e) {
Expand Down

0 comments on commit 3ebda26

Please sign in to comment.