Skip to content

Commit

Permalink
Add Coroutines support to Spring AOP
Browse files Browse the repository at this point in the history
This commit adds support for Kotlin Coroutines to Spring AOP
by leveraging CoroutinesUtils#invokeSuspendingFunction in
AopUtils#invokeJoinpointUsingReflection to convert it to the
equivalent Publisher return value, like in other parts of Spring
Framework.

That allows method interceptors with Reactive support to process
related return values.

CglibAopProxy#processReturnType and JdkDynamicAopProxy#invoke
take care of the conversion from the Publisher return value
to Kotlin Coroutines.

Reactive transactional and HTTP service interface support
have been refined to leverage those new generic capabilities.

Closes gh-22462
  • Loading branch information
sdeleuze committed Aug 25, 2023
1 parent 9b3f456 commit c8169e5
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 51 deletions.
2 changes: 2 additions & 0 deletions integration-tests/integration-tests.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
id 'org.springframework.build.runtimehints-agent'
id 'kotlin'
}

description = "Spring Integration Tests"
Expand All @@ -26,6 +27,7 @@ dependencies {
testImplementation("org.aspectj:aspectjweaver")
testImplementation("org.hsqldb:hsqldb")
testImplementation("org.hibernate:hibernate-core-jakarta")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
}

normalization {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.springframework.aop.framework.autoproxy

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.aopalliance.intercept.MethodInterceptor
import org.aopalliance.intercept.MethodInvocation
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.EnableAspectJAutoProxy
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import reactor.core.publisher.Mono
import java.lang.reflect.Method


/**
* Integration tests for interceptors with Kotlin (with and without Coroutines) configured
* via AspectJ auto-proxy support.
*/
@SpringJUnitConfig(InterceptorConfig::class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
@Autowired val echo: Echo,
@Autowired val firstAdvisor: TestPointcutAdvisor,
@Autowired val secondAdvisor: TestPointcutAdvisor) {

@Test
fun `Multiple interceptors with regular function`() {
assertThat(firstAdvisor.interceptor.invocations).isEmpty()
assertThat(secondAdvisor.interceptor.invocations).isEmpty()
val value = "Hello!"
assertThat(echo.echo(value)).isEqualTo(value)
assertThat(firstAdvisor.interceptor.invocations).singleElement().matches { String::class.java.isAssignableFrom(it) }
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { String::class.java.isAssignableFrom(it) }
}

@Test
fun `Multiple interceptors with suspending function`() {
assertThat(firstAdvisor.interceptor.invocations).isEmpty()
assertThat(secondAdvisor.interceptor.invocations).isEmpty()
val value = "Hello!"
runBlocking {
assertThat(echo.suspendingEcho(value)).isEqualTo(value)
}
assertThat(firstAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
}

@Configuration
@EnableAspectJAutoProxy
open class InterceptorConfig {

@Bean
open fun firstAdvisor() = TestPointcutAdvisor().apply { order = 0 }

@Bean
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }


@Bean
open fun echo(): Echo {
return Echo()
}
}

class TestMethodInterceptor: MethodInterceptor {

var invocations: MutableList<Class<*>> = mutableListOf()

@Suppress("RedundantNullableReturnType")
override fun invoke(invocation: MethodInvocation): Any? {
val result = invocation.proceed()
invocations.add(result!!.javaClass)
return result
}

}

class TestPointcutAdvisor : StaticMethodMatcherPointcutAdvisor(TestMethodInterceptor()) {

val interceptor: TestMethodInterceptor
get() = advice as TestMethodInterceptor

override fun matches(method: Method, targetClass: Class<*>): Boolean {
return targetClass == Echo::class.java && method.name.lowercase().endsWith("echo")
}
}

open class Echo {

open fun echo(value: String): String {
return value;
}

open suspend fun suspendingEcho(value: String): String {
delay(1)
return value;
}

}

}
4 changes: 4 additions & 0 deletions spring-aop/spring-aop.gradle
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
description = "Spring AOP"

apply plugin: "kotlin"

dependencies {
api(project(":spring-beans"))
api(project(":spring-core"))
optional("org.apache.commons:commons-pool2")
optional("org.aspectj:aspectjweaver")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testFixturesImplementation(testFixtures(project(":spring-beans")))
testFixturesImplementation(testFixtures(project(":spring-core")))
testFixturesImplementation("com.google.code.findbugs:jsr305")
testImplementation(project(":spring-core-test"))
testImplementation(testFixtures(project(":spring-beans")))
testImplementation(testFixtures(project(":spring-core")))
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.springframework.cglib.proxy.MethodProxy;
import org.springframework.cglib.proxy.NoOp;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.SmartClassLoader;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -75,6 +76,7 @@
* @author Ramnivas Laddad
* @author Chris Beams
* @author Dave Syer
* @author Sebastien Deleuze
* @see org.springframework.cglib.proxy.Enhancer
* @see AdvisedSupport#setProxyTargetClass
* @see DefaultAopProxyFactory
Expand All @@ -98,6 +100,8 @@ class CglibAopProxy implements AopProxy, Serializable {
/** Keeps track of the Classes that we have validated for final methods. */
private static final Map<Class<?>, Boolean> validatedClasses = new WeakHashMap<>();

private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";


/** The configuration used to configure this proxy. */
protected final AdvisedSupport advised;
Expand Down Expand Up @@ -399,10 +403,11 @@ private static boolean implementsInterface(Method method, Set<Class<?>> ifcs) {
/**
* Process a return value. Wraps a return of {@code this} if necessary to be the
* {@code proxy} and also verifies that {@code null} is not returned as a primitive.
* Also takes care of the conversion from {@code Mono} to Kotlin Coroutines if needed.
*/
@Nullable
private static Object processReturnType(
Object proxy, @Nullable Object target, Method method, @Nullable Object returnValue) {
Object proxy, @Nullable Object target, Method method, Object[] arguments, @Nullable Object returnValue) {

// Massage return value if necessary
if (returnValue != null && returnValue == target &&
Expand All @@ -416,6 +421,11 @@ private static Object processReturnType(
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
if (KotlinDetector.isSuspendingFunction(method)) {
return COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()) ?
CoroutinesUtils.asFlow(returnValue) :
CoroutinesUtils.awaitSingleOrNull(returnValue, arguments[arguments.length - 1]);
}
return returnValue;
}

Expand Down Expand Up @@ -446,7 +456,7 @@ public StaticUnadvisedInterceptor(@Nullable Object target) {
@Nullable
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object retVal = AopUtils.invokeJoinpointUsingReflection(this.target, method, args);
return processReturnType(proxy, this.target, method, retVal);
return processReturnType(proxy, this.target, method, args, retVal);
}
}

Expand All @@ -471,7 +481,7 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy
try {
oldProxy = AopContext.setCurrentProxy(proxy);
Object retVal = AopUtils.invokeJoinpointUsingReflection(this.target, method, args);
return processReturnType(proxy, this.target, method, retVal);
return processReturnType(proxy, this.target, method, args, retVal);
}
finally {
AopContext.setCurrentProxy(oldProxy);
Expand Down Expand Up @@ -499,7 +509,7 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy
Object target = this.targetSource.getTarget();
try {
Object retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
return processReturnType(proxy, target, method, retVal);
return processReturnType(proxy, target, method, args, retVal);
}
finally {
if (target != null) {
Expand Down Expand Up @@ -529,7 +539,7 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy
try {
oldProxy = AopContext.setCurrentProxy(proxy);
Object retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
return processReturnType(proxy, target, method, retVal);
return processReturnType(proxy, target, method, args, retVal);
}
finally {
AopContext.setCurrentProxy(oldProxy);
Expand Down Expand Up @@ -656,7 +666,7 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy
proxy, this.target, method, args, this.targetClass, this.adviceChain, methodProxy);
// If we get here, we need to create a MethodInvocation.
Object retVal = invocation.proceed();
retVal = processReturnType(proxy, this.target, method, retVal);
retVal = processReturnType(proxy, this.target, method, args, retVal);
return retVal;
}
}
Expand Down Expand Up @@ -706,7 +716,7 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy
// We need to create a method invocation...
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
}
return processReturnType(proxy, target, method, retVal);
return processReturnType(proxy, target, method, args, retVal);
}
finally {
if (target != null && !targetSource.isStatic()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.aop.framework;

import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactive.ReactiveFlowKt;
import kotlinx.coroutines.reactor.MonoKt;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import org.springframework.lang.Nullable;

/**
* Package-visible class designed to avoid a hard dependency on Kotlin and Coroutines dependency at runtime.
*
* @author Sebastien Deleuze
* @since 6.1.0
*/
abstract class CoroutinesUtils {

static Object asFlow(Object publisher) {
return ReactiveFlowKt.asFlow((Publisher<?>) publisher);
}

@SuppressWarnings("unchecked")
@Nullable
static Object awaitSingleOrNull(Object mono, Object continuation) {
return MonoKt.awaitSingleOrNull((Mono<?>) mono, (Continuation<Object>) continuation);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.aop.TargetSource;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.DecoratingProxy;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand Down Expand Up @@ -58,6 +60,7 @@
* @author Rob Harrop
* @author Dave Syer
* @author Sergey Tsypanov
* @author Sebastien Deleuze
* @see java.lang.reflect.Proxy
* @see AdvisedSupport
* @see ProxyFactory
Expand All @@ -80,6 +83,8 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa
/** We use a static Log to avoid serialization issues. */
private static final Log logger = LogFactory.getLog(JdkDynamicAopProxy.class);

private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";

/** Config used to configure this proxy. */
private final AdvisedSupport advised;

Expand Down Expand Up @@ -258,6 +263,10 @@ else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive())
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
if (KotlinDetector.isSuspendingFunction(method)) {
return COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()) ?
CoroutinesUtils.asFlow(retVal) : CoroutinesUtils.awaitSingleOrNull(retVal, args[args.length - 1]);
}
return retVal;
}
finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,11 @@
import java.util.List;
import java.util.Set;

import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.Job;
import org.reactivestreams.Publisher;

import org.springframework.aop.Advisor;
import org.springframework.aop.AopInvocationException;
import org.springframework.aop.IntroductionAdvisor;
Expand All @@ -35,6 +40,8 @@
import org.springframework.aop.SpringProxy;
import org.springframework.aop.TargetClassAware;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodIntrospector;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand All @@ -53,6 +60,7 @@
* @author Rod Johnson
* @author Juergen Hoeller
* @author Rob Harrop
* @author Sebastien Deleuze
* @see org.springframework.aop.framework.AopProxyUtils
*/
public abstract class AopUtils {
Expand Down Expand Up @@ -340,7 +348,8 @@ public static Object invokeJoinpointUsingReflection(@Nullable Object target, Met
// Use reflection to invoke the method.
try {
ReflectionUtils.makeAccessible(method);
return method.invoke(target, args);
return KotlinDetector.isSuspendingFunction(method) ?
KotlinDelegate.invokeSuspendingFunction(method, target, args) : method.invoke(target, args);
}
catch (InvocationTargetException ex) {
// Invoked method threw a checked exception.
Expand All @@ -356,4 +365,17 @@ public static Object invokeJoinpointUsingReflection(@Nullable Object target, Met
}
}

/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
private static class KotlinDelegate {

public static Publisher<?> invokeSuspendingFunction(Method method, Object target, Object... args) {
Continuation<?> continuation = (Continuation<?>) args[args.length -1];
CoroutineContext context = continuation.getContext().minusKey(Job.Key);
return CoroutinesUtils.invokeSuspendingFunction(context, method, target, args);
}

}

}
Loading

0 comments on commit c8169e5

Please sign in to comment.