Skip to content

Commit

Permalink
fix: remove maxTime from StandardRetryStrategy (#624)
Browse files Browse the repository at this point in the history
Removes a maximum per/operation timeout from the StandardRetryStrategy.
There is no sensible default for this timeout since an operation could
involve a large request stream. It also subjected operations to timeout
in highly concurrent environments with lots of coroutines if the
coroutine was starved and just didn't get a chance to run.

fixes: awslabs/aws-sdk-kotlin/issues/572
  • Loading branch information
aajtodd authored Apr 7, 2022
1 parent 4aa0376 commit a0c1772
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@

package aws.smithy.kotlin.runtime.retries

import aws.smithy.kotlin.runtime.retries.delay.DelayProvider
import aws.smithy.kotlin.runtime.retries.delay.RetryCapacityExceededException
import aws.smithy.kotlin.runtime.retries.delay.RetryToken
import aws.smithy.kotlin.runtime.retries.delay.RetryTokenBucket
import aws.smithy.kotlin.runtime.retries.delay.*
import aws.smithy.kotlin.runtime.retries.policy.RetryDirective
import aws.smithy.kotlin.runtime.retries.policy.RetryPolicy
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* Implements a retry strategy utilizing backoff delayer and a token bucket for rate limiting and circuit breaking. Note
Expand All @@ -27,18 +20,16 @@ import kotlin.time.Duration.Companion.milliseconds
* @param delayProvider A delayer that can back off after the initial try to spread out the retries.
*/
class StandardRetryStrategy(
val options: StandardRetryStrategyOptions,
private val tokenBucket: RetryTokenBucket,
private val delayProvider: DelayProvider,
val options: StandardRetryStrategyOptions = StandardRetryStrategyOptions.Default,
private val tokenBucket: RetryTokenBucket = StandardRetryTokenBucket(),
private val delayProvider: DelayProvider = ExponentialBackoffWithJitter()
) : RetryStrategy {
/**
* Retry the given block of code until it's successful. Note this method throws exceptions for non-successful
* outcomes from retrying.
*/
override suspend fun <R> retry(policy: RetryPolicy<R>, block: suspend () -> R): Outcome<R> =
withTimeout(options.maxTime) {
doTryLoop(block, policy, 1, tokenBucket.acquireToken(), null)
}
doTryLoop(block, policy, 1, tokenBucket.acquireToken())

/**
* Perform a single iteration of the try loop. Execute the block of code, evaluate the result, and take action to
Expand All @@ -49,20 +40,16 @@ class StandardRetryStrategy(
* @param fromToken A [RetryToken] which grants the strategy capacity to execute a try. This token is resolved
* inside the function by calling [notifySuccess][RetryToken.notifySuccess],
* [notifyFailure][RetryToken.notifyFailure], or [scheduleRetry][RetryToken.scheduleRetry].
* @param previousResult The [Result] from the prior loop iteration. This is used in the case of a timeout to
* include in the thrown exception.
* @return The successful [Outcome] from the final try.
*/
private tailrec suspend fun <R> doTryLoop(
block: suspend () -> R,
policy: RetryPolicy<R>,
attempt: Int,
fromToken: RetryToken,
previousResult: Result<R>?,
): Outcome<R> {
val callResult = runCatching { block() }
when (val ex = callResult.exceptionOrNull()) {
is TimeoutCancellationException -> throwTimeOut(fromToken, attempt, previousResult)
is CancellationException -> throw ex
}

Expand All @@ -83,16 +70,14 @@ class StandardRetryStrategy(
fromToken.scheduleRetry(evaluation.reason)
}
}
} catch (ex: TimeoutCancellationException) {
throwTimeOut(fromToken, attempt, callResult)
} catch (ex: RetryCapacityExceededException) {
throwCapacityExceeded(ex, attempt, callResult)
} catch (ex: Throwable) {
fromToken.notifyFailure()
throw ex
}

return doTryLoop(block, policy, attempt + 1, nextToken, callResult)
return doTryLoop(block, policy, attempt + 1, nextToken)
}

/**
Expand Down Expand Up @@ -136,26 +121,6 @@ class StandardRetryStrategy(
else -> throw ex
}

/**
* Handles the termination of the retry loop because too much time has elapsed by marking the [RetryToken] as failed
* and throwing a [TimedOutException].
* @param token The [RetryToken] used in the attempt that was waiting or executing when the timeout occurred.
* @param attempt The number of attempts completed.
* @param previousResult The last result that was received (i.e., from the prior loop iteration).
*/
private suspend fun <R> throwTimeOut(token: RetryToken, attempt: Int, previousResult: Result<R>?): Nothing {
token.notifyFailure()
when (val ex = previousResult?.exceptionOrNull()) {
null -> throw TimedOutException(
"Took more than ${options.maxTime} to yield a result",
attempt,
previousResult?.getOrNull(),
previousResult?.exceptionOrNull(),
)
else -> throw ex
}
}

/**
* Handles the termination of the retry loop because too many attempts have been made by throwing a
* [TimedOutException].
Expand All @@ -178,14 +143,13 @@ class StandardRetryStrategy(

/**
* Defines configuration for a [StandardRetryStrategy].
* @param maxTime The maximum amount of time to retry.
* @param maxAttempts The maximum number of attempts to make (including the first attempt).
*/
data class StandardRetryStrategyOptions(val maxTime: Duration, val maxAttempts: Int) {
data class StandardRetryStrategyOptions(val maxAttempts: Int) {
companion object {
/**
* The default retry strategy configuration.
*/
val Default = StandardRetryStrategyOptions(maxTime = 20_000.milliseconds, maxAttempts = 3)
val Default = StandardRetryStrategyOptions(maxAttempts = 3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import kotlin.time.DurationUnit
*
* @param options The configuration to use for this delayer.
*/
class ExponentialBackoffWithJitter(val options: ExponentialBackoffWithJitterOptions) : DelayProvider {
class ExponentialBackoffWithJitter(
val options: ExponentialBackoffWithJitterOptions = ExponentialBackoffWithJitterOptions.Default
) : DelayProvider {
private val random = Random.Default

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private const val MS_PER_S = 1_000
* @param clock A clock to use for time calculations.
*/
class StandardRetryTokenBucket(
val options: StandardRetryTokenBucketOptions,
val options: StandardRetryTokenBucketOptions = StandardRetryTokenBucketOptions.Default,
private val clock: Clock = Clock.System,
) : RetryTokenBucket {
internal var capacity = options.maxCapacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import aws.smithy.kotlin.runtime.retries.policy.RetryDirective
import aws.smithy.kotlin.runtime.retries.policy.RetryErrorType
import aws.smithy.kotlin.runtime.retries.policy.RetryPolicy
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds

class StandardRetryStrategyTest {
@OptIn(ExperimentalCoroutinesApi::class)
Expand Down Expand Up @@ -161,53 +159,6 @@ class StandardRetryStrategyTest {
val token = bucket.lastTokenAcquired!!
assertTrue(token.nextToken!!.nextToken!!.isFailure)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testTooLongFromException() = runTest {
val options = StandardRetryStrategyOptions.Default.copy(maxTime = 1_500.milliseconds)
val bucket = RecordingTokenBucket()
val delayer = RecordingDelayer()
val retryer = StandardRetryStrategy(options, bucket, delayer)
val policy = StringRetryPolicy()

val result = runCatching {
retryer.retry(policy) {
delay(1_000)
throw ConcurrentModificationException()
}
}

assertIs<ConcurrentModificationException>(result.exceptionOrNull(), "Unexpected ${result.exceptionOrNull()}")

val token = bucket.lastTokenAcquired!!
assertTrue(token.nextToken!!.isFailure)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testTooLongFromResult() = runTest {
val options = StandardRetryStrategyOptions.Default.copy(maxTime = 1_000.milliseconds)
val bucket = RecordingTokenBucket()
val delayer = RecordingDelayer()
val retryer = StandardRetryStrategy(options, bucket, delayer)
val policy = StringRetryPolicy()

val result = runCatching {
retryer.retry(policy) {
delay(2_000)
"This will never run!"
}
}

val ex = assertIs<TimedOutException>(result.exceptionOrNull(), "Unexpected ${result.exceptionOrNull()}")
assertEquals(1, ex.attempts)
assertNull(ex.lastResponse)
assertNull(ex.lastException)

val token = bucket.lastTokenAcquired!!
assertTrue(token.isFailure)
}
}

fun block(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import kotlinx.coroutines.test.runTest
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlin.test.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

class StandardRetryIntegrationTest {
Expand All @@ -30,7 +29,7 @@ class StandardRetryIntegrationTest {
val testCases = standardRetryIntegrationTestCases
.mapValues { Yaml.default.decodeFromString(TestCase.serializer(), it.value) }
testCases.forEach { (name, tc) ->
val options = StandardRetryStrategyOptions(maxTime = Duration.INFINITE, maxAttempts = tc.given.maxAttempts)
val options = StandardRetryStrategyOptions(maxAttempts = tc.given.maxAttempts)
val tokenBucket = StandardRetryTokenBucket(
StandardRetryTokenBucketOptions.Default.copy(
maxCapacity = tc.given.initialRetryTokens,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,24 +263,9 @@ object KotlinClientRuntimeConfigProperty {
strategy.
""".trimIndent()

val retryStrategyBlock = """
run {
val strategyOptions = StandardRetryStrategyOptions.Default
val tokenBucket = StandardRetryTokenBucket(StandardRetryTokenBucketOptions.Default)
val delayer = ExponentialBackoffWithJitter(ExponentialBackoffWithJitterOptions.Default)
StandardRetryStrategy(strategyOptions, tokenBucket, delayer)
}
""".trimIndent()
propertyType = ClientConfigPropertyType.ConstantValue(retryStrategyBlock)

additionalImports = listOf(
RuntimeTypes.Core.Retries.StandardRetryStrategy,
RuntimeTypes.Core.Retries.StandardRetryStrategyOptions,
RuntimeTypes.Core.Retries.Delay.StandardRetryTokenBucket,
RuntimeTypes.Core.Retries.Delay.StandardRetryTokenBucketOptions,
RuntimeTypes.Core.Retries.Delay.ExponentialBackoffWithJitter,
RuntimeTypes.Core.Retries.Delay.ExponentialBackoffWithJitterOptions,
)
propertyType = ClientConfigPropertyType.ConstantValue("StandardRetryStrategy()")

additionalImports = listOf(RuntimeTypes.Core.Retries.StandardRetryStrategy)
}

SdkLogMode = ClientConfigProperty {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private fun KotlinWriter.renderRetryStrategy(wi: WaiterInfo, asValName: String)
}
write("val delay = ExponentialBackoffWithJitter(delayOptions)")
write("")
write("val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.#T, maxAttempts = 20)", KotlinTypes.Time.seconds)
write("val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)")
write("StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ class Config private constructor(builder: Builder): HttpClientConfig, Idempotenc
val endpointResolver: EndpointResolver = requireNotNull(builder.endpointResolver) { "endpointResolver is a required configuration property" }
override val httpClientEngine: HttpClientEngine? = builder.httpClientEngine
override val idempotencyTokenProvider: IdempotencyTokenProvider? = builder.idempotencyTokenProvider
val retryStrategy: RetryStrategy = run {
val strategyOptions = StandardRetryStrategyOptions.Default
val tokenBucket = StandardRetryTokenBucket(StandardRetryTokenBucketOptions.Default)
val delayer = ExponentialBackoffWithJitter(ExponentialBackoffWithJitterOptions.Default)
StandardRetryStrategy(strategyOptions, tokenBucket, delayer)
}
val retryStrategy: RetryStrategy = StandardRetryStrategy()
override val sdkLogMode: SdkLogMode = builder.sdkLogMode
"""
contents.shouldContainWithDiff(expectedProps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ServiceWaitersGeneratorTest {
)
val delay = ExponentialBackoffWithJitter(delayOptions)
val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.seconds, maxAttempts = 20)
val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)
StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)
}
""".formatForTest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class WaiterGeneratorTest {
)
val delay = ExponentialBackoffWithJitter(delayOptions)
val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.seconds, maxAttempts = 20)
val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)
StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)
}
""".formatForTest()
Expand All @@ -56,7 +56,7 @@ class WaiterGeneratorTest {
)
val delay = ExponentialBackoffWithJitter(delayOptions)
val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.seconds, maxAttempts = 20)
val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)
StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)
}
""".formatForTest()
Expand Down

0 comments on commit a0c1772

Please sign in to comment.