Releases: pellse/assembler
Assembler v0.7.7
What's Changed
- The Auto Caching API was renamed to Stream Table to better reflect what the API does, which is conceptually similar to what a KTable is with Apache Kafka:
Flux<BillingInfo> billingInfoFlux = ... // From e.g. Debezium/Kafka, RabbitMQ, etc.;
Flux<OrderItem> orderItemFlux = ... // From e.g. Debezium/Kafka, RabbitMQ, etc.;
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdResolver(Customer::customerId)
.withRules(
rule(BillingInfo::customerId,
oneToOne(cached(call(this::getBillingInfo), streamTable(billingInfoFlux)))),
rule(OrderItem::customerId,
oneToMany(OrderItem::id, cachedMany(call(this::getAllOrders), streamTable(orderItemFlux)))),
Transaction::new)
.build();
var transactionFlux = getCustomers()
.window(3)
.flatMapSequential(assembler::assemble);
- Optimizations in reactive read-write lock implementation and
ReactiveGuard
high level api - Updated Project Reactor to version 3.7.0 to match the release of Spring Boot 3.4.0
Assembler v0.7.6
What's Changed
-
The use of embedded Assembler instances allows for the aggregation of sub-queries defined in rules. This new feature was heavily inspired by this great article from Vlad Mihalcea. It enables the modeling of a complex relationship graph (or, as mentioned in the article, a multi-level hierarchical structure) of disparate data sources (e.g., microservices, relational or non-relational databases, message queues, etc.) without triggering either N+1 queries or a Cartesian product.
See EmbeddedAssemblerTest.java for an example of how to use this new feature:
Assembler<UserVoteView, UserVote> userVoteAssembler = assemblerOf(UserVote.class)
.withCorrelationIdResolver(UserVoteView::id)
.withRules(
rule(User::id, UserVoteView::userId, oneToOne(call(this::getUsersById))),
UserVote::new
)
.build();
Assembler<PostComment, PostComment> postCommentAssembler = assemblerOf(PostComment.class)
.withCorrelationIdResolver(PostComment::id)
.withRules(
rule(UserVote::commentId, oneToMany(UserVote::id, call(assemble(this::getUserVoteViewsById, userVoteAssembler)))),
PostComment::new
)
.build();
Assembler<PostDetails, Post> postAssembler = assemblerOf(Post.class)
.withCorrelationIdResolver(PostDetails::id)
.withRules(
rule(PostComment::postId, oneToMany(PostComment::id, call(assemble(this::getPostCommentsById, postCommentAssembler)))),
rule(PostTag::postId, oneToMany(PostTag::id, call(this::getPostTagsById))),
Post::new
)
.build();
// If getPostDetails() is a finite sequence
Flux<Post> posts = postAssembler.assemble(getPostDetails());
// If getPostDetails() is a continuous stream
Flux<Post> posts = getPostDetails()
.windowTimeout(100, Duration.ofSeconds(5))
.flatMapSequential(postAssembler::assemble);
- Big improvement in concurrency management, better fairness introduced in reactive read-write lock implementation
- Support for configuration of
Scheduler
backed by virtual thread executor- By default,
AssemblerBuilder
will useSchedulers.boundedScheduler()
ifreactor.schedulers.defaultBoundedElasticOnVirtualThreads
system property is present
- By default,
Assembler v0.7.5
What's Changed
This release fixes an issue where there is no direct correlation ID between a top-level entity and a sub-level entity by introducing the concept of an ID join.
For example, before this release, there was no way to express the relationship between e.g. a PostDetails
and a User
because User
doesn't have a postId
field like Reply
does, as described in #33.
record PostDetails(Long id, Long userId, String content) {
}
record User(Long Id, String username) { // No correlation Id back to PostDetails
}
record Reply(Long id, Long postId, Long userId, String content) {
}
record Post(PostDetails post, User author, List<Reply> replies) {
}
Assembler<PostDetails, Post> assembler = assemblerOf(Post.class)
.withCorrelationIdResolver(PostDetails::id)
.withRules(
rule(XXXXX, oneToOne(call(PostDetails::userId, this::getUsersById))), // What should XXXXX be?
rule(Reply::postId, oneToMany(Reply::id, call(this::getRepliesById))),
Post::new)
.build();
Since 0.7.5, this relationship can now be expressed:
Assembler<PostDetails, Post> assembler = assemblerOf(Post.class)
.withCorrelationIdResolver(PostDetails::id)
.withRules(
rule(User::Id, PostDetails::userId, oneToOne(call(this::getUsersById))), // ID Join
rule(Reply::postId, oneToMany(Reply::id, call(this::getRepliesById))),
Post::new)
.build();
This would be semantically equivalent to the following SQL query if all entities were stored in the same relational database:
SELECT
p.id AS post_id,
p.userId AS post_userId,
p.content AS post_content,
u.id AS author_id,
u.username AS author_username,
r.id AS reply_id,
r.postId AS reply_postId,
r.userId AS reply_userId,
r.content AS reply_content
FROM
PostDetails p
JOIN
User u ON p.userId = u.id -- rule(User::Id, PostDetails::userId, ...)
LEFT JOIN
Reply r ON p.id = r.postId -- rule(Reply::postId, ...)
WHERE
p.id IN (1, 2, 3); -- withCorrelationIdResolver(PostDetails::id)
Assembler v0.7.4
This release brings 2 new features:
- Support for Spring Caching, allowing the use of any caching implementation supported by the Spring Framework (example taken from demo project https://github.com/pellse/assembler-spring-graphql-example):
import org.springframework.cache.CacheManager;
import static io.github.pellse.assembler.caching.spring.SpringCacheFactory.springCache;
...
@Controller
public class SpO2MonitoringGraphQLController {
record SpO2Reading(SpO2 spO2, Patient patient, BodyMeasurement bodyMeasurement) {
}
private final Assembler<SpO2, SpO2Reading> spO2ReadingAssembler;
SpO2MonitoringGraphQLController(
PatientService ps,
BodyMeasurementService bms,
CacheManager cacheManager) {
final var patientCache = cacheManager.getCache("patientCache");
final var bodyMeasurementCache = cacheManager.getCache("bodyMeasurementCache");
spO2ReadingAssembler = assemblerOf(SpO2Reading.class)
.withCorrelationIdResolver(SpO2::patientId)
.withRules(
rule(Patient::id, oneToOne(cached(call(SpO2::healthCardNumber, ps::findPatientsByHealthCardNumber), springCache(patientCache)))),
rule(BodyMeasurement::patientId, oneToOne(cached(call(bms::getBodyMeasurements), springCache(bodyMeasurementCache)))),
SpO2Reading::new)
.build();
}
}
- Ability to configure read and write non-blocking bounded queues in
ConcurrentCache
Assembler v0.7.3
This release primarily aims to enhance performance. It features the following updates:
- A redesigned Caching API that generically supports both single values and collections of values as cache entries, providing dual Map/MultiMap semantics.
- A comprehensive overhaul of the caching concurrency logic, leading to a substantial performance improvement over the previous version.
Assembler v0.7.2
This release introduce asynchronous caching for default cache implementation, and reverts the name of the library back to Assembler
(previously CohereFlux
)
What's Changed
Full Changelog: v0.7.1...v0.7.2
CohereFlux v0.7.1
This is a big release, with a re-architecture of the framework allowing query functions to have access to the whole entity from the upstream instead of having to rely solely on IDs
What's Changed
- The framework is now called CohereFlux, the whole API was modified to reflect that change
- Now passing entities T down the entire processing chain instead of ID
- Adding
RuleMapperSource.call()
to invoke a queryFunction with list of IDs instead of Collection of top level entities - New
BatchRule
API - Adding factory methods in
CaffeineCacheFactory
for max size and cache access expiry duration - Replaced
RC
withFlux<R>
in CohereFlux interface, type parameters are nowCohereFlux<T, R>
- FetchFunction in
CacheFactory
now returns an empty Map when noRuleMapperSource
is defined
Assembler v0.6.6
What's Changed (from Git Commit messages)
- Adding
BatchRuleBuilder
andBatchRule
for easier integration with Spring GraphQL
import io.github.pellse.reactive.assembler.Rule.BatchRule;
import static io.github.pellse.reactive.assembler.Rule.batchRule;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
@Controller
public class PatientObservationGraphQLController {
private final PatientService patientService;
private final BatchRule<Patient, BodyMeasurement> bodyMeasurementBatchRule;
private final BatchRule<Patient, List<SpO2>> spO2BatchRule;
PatientObservationGraphQLController(PatientService ps, BodyMeasurementService bms, SpO2StreamingService spO2ss) {
this.patientService = ps;
this.bodyMeasurementBatchRule = batchRule(BodyMeasurement::patientId, oneToOne(cached(bms::retrieveBodyMeasurements)))
.withIdExtractor(Patient::id);
this.spO2BatchRule = batchRule(SpO2::patientId, oneToMany(SpO2::id, cached(autoCache(spO2ss::spO2Flux))))
.withIdExtractor(Patient::id);
}
@QueryMapping
Flux<Patient> patients() {
return patientService.findAllPatients();
}
@BatchMapping
Mono<Map<Patient, BodyMeasurement>> bodyMeasurement(List<Patient> patients) {
return bodyMeasurementBatchRule.executeToMono(patients);
}
@BatchMapping
Flux<List<SpO2>> spO2(List<Patient> patients) {
return spO2BatchRule.executeToFlux(patients);
}
}
Full Changelog: v0.6.5...v0.6.6
Assembler v0.6.5
This release focuses on performance and consistency optimization of caching, including the auto caching feature.
What's Changed (from Git Commit messages)
- Cache concurrency optimization by switching to
MULTIPLE_READERS
strategy when caching fetchFunction is guaranteed to be empty - Ability to configure Retry strategies with specific
Scheduler
in concurrent cache, sameScheduler
used for auto cache and retry strategies - Ref count instead of boolean flag to manage start/stop in
concurrentLifeCycleEventListener()
Breaking API changes:
concurrency()
factory methods renamed toxxxRetryStrategy()
inAutoCacheFactoryBuilder
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
autoCacheBuilder(billingInfoFlux)
.maxWindowSizeAndTime(100, ofSeconds(5))
.errorHandler(error -> logger.log(WARNING, "Error in autoCache", error))
.scheduler(newParallel("billing-info"))
.maxRetryStrategy(50) // used to be named `concurrency()`
.build()))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
autoCacheBuilder(orderItemFlux)
.maxWindowSize(50)
.errorHandler(onErrorMap(MyException::new))
.scheduler(newParallel("order-item"))
.backoffRetryStrategy(100, ofMillis(10)) // used to be named `concurrency()`
.build()))),
Transaction::new)
.build();
Dependencies upgrades
- Project Reactor 3.5.6
- Kotlin 1.8.21
Assembler v0.6.4
What's Changed
- New
errorHandler()
methods onAutoCacheFactoryBuilder
to simplify error handler config when we only want to log the error and continue processing - New
concurrency()
method, ability to configure concurrency settings on AutoCacheFactoryBuilder for the underlying ConcurrentCache
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;
var logger = getLogger("logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);
Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
autoCacheBuilder(billingInfoFlux)
.maxWindowSizeAndTime(100, ofSeconds(5))
.errorHandler(error -> logger.log(WARNING, error)) // New `errorHandler()` method
.concurrency(20) // New `concurrency()` method -> maxAttempts = 20
.build()))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
autoCacheBuilder(orderItemFlux)
.maxWindowSize(50)
.errorHandler(logWarning) // New `errorHandler()` method
.concurrency(20, ofMillis(10)) // New `concurrency()` method -> maxAttempts = 20, delay = 10 ms
.build()))),
Transaction::new)
.build();
var transactionFlux = getCustomers()
.window(3)
.flatMapSequential(assembler::assemble);
Dependencies upgrades
- Caffeine 3.1.6