Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appropriate way to implement an error handling interceptor on suspend functions #8891

Closed
SolomanDavis opened this issue Mar 6, 2023 · 14 comments

Comments

@SolomanDavis
Copy link

Expected Behavior

I am trying to implement an AOP interceptor that intercepts kotlin suspend functions and appropriately handles / transforms errors for other interceptors and logic (e.g. gRPC).

With the below interceptor and example implementation using that interceptor, I am unable to get context.proceed() to bubble up the error that is thrown inside the suspend function implementation. Instead an internal COROUTINE_SUSPENDED value is returned and from what I can tell, the exception is thrown later when kotlin decides to unsuspend the coroutine however it is still never caught by any try catch or other error handlers in the interceptor.

Given the interceptor + implementation below - I expect the print statement in the catch {} block to be printed along with others in the following sequence:

Attempting to intercept method suspendFunc
printing result so kotlin doesn't optimize deferred - OK
EXPECTED PRINT - Encountered exception attempting on context.proceed()

Actual Behaviour

The order that actually gets printed is:

Attempting to intercept method suspendFunc
Proceeded with context and got result COROUTINE_SUSPENDED
printing result so kotlin doesn't optimize deferred - OK

Note that the expected print statement in the interceptor catch {} block is never executed.

Below are brief interceptor and example implementations - I have also provided an example repository. I got this output from running the one test there.

Interceptor:

@Singleton
@InterceptorBean(FaultyInterceptorAnnotation::class)
class FaultyInterceptor() : MethodInterceptor<Any?, Any?> {
    override fun intercept(context: MethodInvocationContext<Any?, Any?>): Any? {
        if (context.isSuspend) {
            println("Attempting to intercept method ${context.methodName}")

            try {
                val res = context.proceed()
                println("Proceeded with context and got result $res")
                return res
            } catch (ex: Throwable) {
                println("EXPECTED PRINT - Encountered exception attempting on context.proceed()")
                ex.printStackTrace()
                throw ex
            }

        } else {
            println("Method ${context.methodName} is not a suspend function")
            return context.proceed()
        }
    }
}

Example implementation

@Singleton
@FaultyInterceptorAnnotation
class ExampleImpl {
    suspend fun suspendFunc(): Boolean = coroutineScope {
        val deferred = async {
            delay(500)
            return@async "OK"
        }
        val result = deferred.await()
        println("printing result so kotlin doesn't optimize deferred - $result")
        throw RuntimeException("RUNTIME EXCEPTION THAT SHOULD BE THROWN AND CAUGHT")
    }
}

Steps To Reproduce

  1. Checkout https://github.com/SolomanDavis/micronaut-interceptor-suspend
  2. Run testItWorks

Environment Information

  • Operating System: Linux
  • JDK Version: !7
  • Kotlin Version: 1.6.10

Example Application

https://github.com/SolomanDavis/micronaut-interceptor-suspend

Version

3.6.2 -> 3.8.6

@SolomanDavis
Copy link
Author

SolomanDavis commented Mar 6, 2023

I have tried many different solutions to no avail including the following:

  • wrapping the context.proceed() call in a runBlocking {}
  • wrapping the context.proceed() call in a runBlocking {}, launch {}-ing the suspend function and then joining on the resulting job before returning from the interceptor.
  • Avoiding context.proceed() and directly doing the following:
try {
    val result = interceptedMethod.handleResult(interceptedMethod.interceptResultAsCompletionStage())
    return result
catch (ex: Throwable) {
    print("caught exception")
}

still result == COROUTINE_SUSPENDED and no error is caught in the try {} catch {}

Of course, I would much prefer not to avoid the context.proceed() to ensure that any other interceptors are invoked appropriately.

Do I need to restructure my error handling to support these errors being produced in CompletableFutures? Is it possible to acquire the result of context.proceed() in the form a CompletableFuture that I can then use?

@SolomanDavis
Copy link
Author

Does anyone have any idea on how to proceed for this?

@dstepanov
Copy link
Contributor

@SolomanDavis
Copy link
Author

I have attempted to implement the interceptor in this style multiple times but still get the same result. I've updated the linked example repo with a boiled down reproduction of this issue and copied the interceptor code here for convenience:

package live.bunch.interceptor

import io.micronaut.aop.InterceptedMethod
import io.micronaut.aop.InterceptorBean
import io.micronaut.aop.MethodInterceptor
import io.micronaut.aop.MethodInvocationContext
import jakarta.inject.Singleton
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors

@Singleton
@InterceptorBean(FaultyInterceptorAnnotation::class)
class FaultyInterceptor() : MethodInterceptor<Any?, Any?> {
    override fun intercept(context: MethodInvocationContext<Any?, Any?>): Any? {
        val executorService = Executors.newSingleThreadExecutor()

        if (context.isSuspend) {
            val interceptedMethod = InterceptedMethod.of(context)
            println("Attempting to intercept method ${context.methodName}")

            try {
                val res = interceptedMethod.handleResult(
                    CompletableFuture.supplyAsync(
                        { interceptedMethod.interceptResultAsCompletionStage() },
                        executorService
                    )
                )
                println("Proceeded with context and got result $res")
                return res
            } catch (ex: Throwable) {
                println("EXPECTED PRINT - Encountered exception attempting on context.proceed()")
                ex.printStackTrace()
                throw ex
            }

        } else {
            println("Method ${context.methodName} is not a suspend function")
            return context.proceed()
        }
    }
}

When using this interceptor, no exception is caught (i.e. EXPECTED PRINT - Encountered exception attempting on context.proceed() is never printed) and the result is COROUTINE_SUSPENDED:
Proceeded with context and got result COROUTINE_SUSPENDED

@graemerocher
Copy link
Contributor

you might want to look at

for inspiration

@dstepanov
Copy link
Contributor

The res should be completable future and it will have the exception propagated in it

@SolomanDavis
Copy link
Author

I'm still taking a look at using the RecoveryInterceptor style although I don't see much difference between that and AsyncInterceptor - unless I'm missing the intent there?

But yes, the res value is not a CompletableFuture. It's an Any! that can either be null, the actual result value, or CoroutineSingletons.COROUTINE_SUSPENDED special value indicating that the coroutine is still suspended.

The problem is that since this isn't a CompletableFuture, it's very difficult to do much error handling or other handling that depends on the value of res, because the interceptor is running while res is not yet realized and no error has yet been thrown.

@SolomanDavis
Copy link
Author

SolomanDavis commented Apr 12, 2023

Updating the try catch logic within that conditional like so to hopefully give some insight into what is happening with the completable:

            val interceptedMethod = InterceptedMethod.of(context)
            println("Attempting to intercept method ${context.methodName}")

            try {
                val res = interceptedMethod.handleResult(
                    CompletableFuture.supplyAsync(
                        {
                            interceptedMethod.interceptResultAsCompletionStage()
                                .exceptionally { ex ->
                                    println("intercepted error: $ex")
                                    return@exceptionally null
                                }
                        },
                        executorService
                    )
                )
                println("Proceeded with context and got result $res")
                return res
            } catch (ex: Throwable) {
                println("EXPECTED PRINT - Encountered exception attempting on context.proceed()")
                ex.printStackTrace()
                throw ex
            }

This is what's printed:

Attempting to intercept method suspendFunc
Proceeded with context and got result COROUTINE_SUSPENDED
intercepted error: kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job="coroutine#1":BlockingCoroutine{Cancelled}@6a0ac48e

I think I am expecting handleResult to act like a coroutine and return once it's done and has a value or exception. But maybe that's not the correct way to think about it?

@dstepanov
Copy link
Contributor

Sorry, I have misled you.
interceptResultAsCompletionStage will produce the CompletableFuture where you can do error handling.

@SolomanDavis
Copy link
Author

If I understand correctly, that is the intention behind interceptedMethod::handleResult correct?
Looking at its implementation:

    @Override
    public Object handleResult(Object result) {
        CompletionStage completionStageResult;
        if (result instanceof CompletionStage) {
            completionStageResult = (CompletionStage<?>) result;
        } else {
            throw new IllegalStateException("Cannot convert " + result + "  to 'java.util.concurrent.CompletionStage'");
        }
        return KotlinInterceptedMethodHelper.handleResult(completionStageResult, isUnitValueType, continuation);
    }

that helper method eventually calls suspendCoroutine { continuation -> ... } and resumes the continuation either with a result or with an exception depending. I think I understand why this is producing a COROUTINE_SUSPENDED as a result of that suspendCoroutine block, but what I don't understand is how to integrate this with the Java micronaut interceptor world that doesn't understand suspension.

While it is suspended, a value is being returned. Are you suggesting I wait on the future returned from interceptResultAsCompletionStage() rather than call .handleResult()?

@SolomanDavis
Copy link
Author

Waiting on the future explicitly actually causes execution to hang:

            try {
                return interceptedMethod.interceptResultAsCompletionStage()
                    .whenComplete { result, ex ->
                        if (result != null) {
                            println("intercepted result: $result")
                        }
                        if (ex != null) {
                            println("intercepted error: $ex")
                        }
                        println("intercepted result: $result, ex: $ex")
                    }
                    .toCompletableFuture()
                    .get()
            } catch (ex: Throwable) {
                println("EXPECTED PRINT - Encountered exception attempting on context.proceed()")
                ex.printStackTrace()
                throw ex
            }

Only Attempting to intercept method suspendFunc is printed and then runs indefinitely

@SolomanDavis
Copy link
Author

Are there any other suggestions w.r.t. this issue? None of these strategies seem to work. There is an additional piece of context which can help understand why this does not work:

We are using [grpc-kotlin] and we are facing this issue when implementing a suspend fun interceptor. grpc-kotlin requires us to implement suspend fun methods for endpoints, but we want to consolidate exception handling (i.e. mapping certain exceptions to certain gRPC status codes) to an interceptor or something of the like.

The problem seems to be that even when including the exception handling and mapping and other tasks in the CompletableFuture chain, the raw exception is being caught by gRPC which they will translate to an UNKNOWN. We have exception handling logic to map exceptions to gRPC status codes.

@dstepanov
Copy link
Contributor

dstepanov commented Apr 18, 2023

               return interceptedMethod.interceptResultAsCompletionStage()
                    .whenComplete { result, ex ->
                        if (result != null) {
                            println("intercepted result: $result")
                        }
                        if (ex != null) {
                            println("intercepted error: $ex")
                        }
                        println("intercepted result: $result, ex: $ex")
                    }
                    .toCompletableFuture()
                    .get()

Don't block the future, pass it to handleResult:

               return interceptedMethod.handleResult(interceptedMethod.interceptResultAsCompletionStage()
                    .whenComplete { result, ex ->
                        if (result != null) {
                            println("intercepted result: $result")
                        }
                        if (ex != null) {
                            println("intercepted error: $ex")
                        }
                        println("intercepted result: $result, ex: $ex")
                    });

@SolomanDavis
Copy link
Author

SolomanDavis commented May 11, 2023

Awesome, thank you very much for this. Solved with the following usage:

        val interceptedMethod = InterceptedMethod.of(context)
        if (context.isSuspend) {
            val resultFuture = CompletableFuture<Any?>()
            // ...
            try {
                interceptedMethod.interceptResultAsCompletionStage()
                    .whenComplete { res, ex ->
                        if (ex == null) {
                            resultFuture.complete(res)
                        } else {
                            // Handle errors that occur when the intercepted method suspends ...
                            resultFuture.completeExceptionally(ex)
                        }
                    }
            } catch (ex: Throwable) {
                // Handle errors that occur when the intercepted method never suspends ...
                resultFuture.completeExceptionally(ex)
            }
            // ...
            return interceptedMethod.handleResult(resultFuture)
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants