Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove maxTime from StandardRetryStrategy #624

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@

package aws.smithy.kotlin.runtime.retries

import aws.smithy.kotlin.runtime.retries.delay.DelayProvider
import aws.smithy.kotlin.runtime.retries.delay.RetryCapacityExceededException
import aws.smithy.kotlin.runtime.retries.delay.RetryToken
import aws.smithy.kotlin.runtime.retries.delay.RetryTokenBucket
import aws.smithy.kotlin.runtime.retries.delay.*
import aws.smithy.kotlin.runtime.retries.policy.RetryDirective
import aws.smithy.kotlin.runtime.retries.policy.RetryPolicy
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* Implements a retry strategy utilizing backoff delayer and a token bucket for rate limiting and circuit breaking. Note
Expand All @@ -27,18 +20,16 @@ import kotlin.time.Duration.Companion.milliseconds
* @param delayProvider A delayer that can back off after the initial try to spread out the retries.
*/
class StandardRetryStrategy(
val options: StandardRetryStrategyOptions,
private val tokenBucket: RetryTokenBucket,
private val delayProvider: DelayProvider,
val options: StandardRetryStrategyOptions = StandardRetryStrategyOptions.Default,
private val tokenBucket: RetryTokenBucket = StandardRetryTokenBucket(),
private val delayProvider: DelayProvider = ExponentialBackoffWithJitter()
) : RetryStrategy {
/**
* Retry the given block of code until it's successful. Note this method throws exceptions for non-successful
* outcomes from retrying.
*/
override suspend fun <R> retry(policy: RetryPolicy<R>, block: suspend () -> R): Outcome<R> =
withTimeout(options.maxTime) {
doTryLoop(block, policy, 1, tokenBucket.acquireToken(), null)
}
doTryLoop(block, policy, 1, tokenBucket.acquireToken())

/**
* Perform a single iteration of the try loop. Execute the block of code, evaluate the result, and take action to
Expand All @@ -49,20 +40,16 @@ class StandardRetryStrategy(
* @param fromToken A [RetryToken] which grants the strategy capacity to execute a try. This token is resolved
* inside the function by calling [notifySuccess][RetryToken.notifySuccess],
* [notifyFailure][RetryToken.notifyFailure], or [scheduleRetry][RetryToken.scheduleRetry].
* @param previousResult The [Result] from the prior loop iteration. This is used in the case of a timeout to
* include in the thrown exception.
* @return The successful [Outcome] from the final try.
*/
private tailrec suspend fun <R> doTryLoop(
block: suspend () -> R,
policy: RetryPolicy<R>,
attempt: Int,
fromToken: RetryToken,
previousResult: Result<R>?,
): Outcome<R> {
val callResult = runCatching { block() }
when (val ex = callResult.exceptionOrNull()) {
is TimeoutCancellationException -> throwTimeOut(fromToken, attempt, previousResult)
is CancellationException -> throw ex
}

Expand All @@ -83,16 +70,14 @@ class StandardRetryStrategy(
fromToken.scheduleRetry(evaluation.reason)
}
}
} catch (ex: TimeoutCancellationException) {
throwTimeOut(fromToken, attempt, callResult)
} catch (ex: RetryCapacityExceededException) {
throwCapacityExceeded(ex, attempt, callResult)
} catch (ex: Throwable) {
fromToken.notifyFailure()
throw ex
}

return doTryLoop(block, policy, attempt + 1, nextToken, callResult)
return doTryLoop(block, policy, attempt + 1, nextToken)
}

/**
Expand Down Expand Up @@ -136,26 +121,6 @@ class StandardRetryStrategy(
else -> throw ex
}

/**
* Handles the termination of the retry loop because too much time has elapsed by marking the [RetryToken] as failed
* and throwing a [TimedOutException].
* @param token The [RetryToken] used in the attempt that was waiting or executing when the timeout occurred.
* @param attempt The number of attempts completed.
* @param previousResult The last result that was received (i.e., from the prior loop iteration).
*/
private suspend fun <R> throwTimeOut(token: RetryToken, attempt: Int, previousResult: Result<R>?): Nothing {
token.notifyFailure()
when (val ex = previousResult?.exceptionOrNull()) {
null -> throw TimedOutException(
"Took more than ${options.maxTime} to yield a result",
attempt,
previousResult?.getOrNull(),
previousResult?.exceptionOrNull(),
)
else -> throw ex
}
}

/**
* Handles the termination of the retry loop because too many attempts have been made by throwing a
* [TimedOutException].
Expand All @@ -178,14 +143,13 @@ class StandardRetryStrategy(

/**
* Defines configuration for a [StandardRetryStrategy].
* @param maxTime The maximum amount of time to retry.
* @param maxAttempts The maximum number of attempts to make (including the first attempt).
*/
data class StandardRetryStrategyOptions(val maxTime: Duration, val maxAttempts: Int) {
data class StandardRetryStrategyOptions(val maxAttempts: Int) {
companion object {
/**
* The default retry strategy configuration.
*/
val Default = StandardRetryStrategyOptions(maxTime = 20_000.milliseconds, maxAttempts = 3)
val Default = StandardRetryStrategyOptions(maxAttempts = 3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import kotlin.time.DurationUnit
*
* @param options The configuration to use for this delayer.
*/
class ExponentialBackoffWithJitter(val options: ExponentialBackoffWithJitterOptions) : DelayProvider {
class ExponentialBackoffWithJitter(
val options: ExponentialBackoffWithJitterOptions = ExponentialBackoffWithJitterOptions.Default
) : DelayProvider {
private val random = Random.Default

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private const val MS_PER_S = 1_000
* @param clock A clock to use for time calculations.
*/
class StandardRetryTokenBucket(
val options: StandardRetryTokenBucketOptions,
val options: StandardRetryTokenBucketOptions = StandardRetryTokenBucketOptions.Default,
private val clock: Clock = Clock.System,
) : RetryTokenBucket {
internal var capacity = options.maxCapacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import aws.smithy.kotlin.runtime.retries.policy.RetryDirective
import aws.smithy.kotlin.runtime.retries.policy.RetryErrorType
import aws.smithy.kotlin.runtime.retries.policy.RetryPolicy
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds

class StandardRetryStrategyTest {
@OptIn(ExperimentalCoroutinesApi::class)
Expand Down Expand Up @@ -161,53 +159,6 @@ class StandardRetryStrategyTest {
val token = bucket.lastTokenAcquired!!
assertTrue(token.nextToken!!.nextToken!!.isFailure)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testTooLongFromException() = runTest {
val options = StandardRetryStrategyOptions.Default.copy(maxTime = 1_500.milliseconds)
val bucket = RecordingTokenBucket()
val delayer = RecordingDelayer()
val retryer = StandardRetryStrategy(options, bucket, delayer)
val policy = StringRetryPolicy()

val result = runCatching {
retryer.retry(policy) {
delay(1_000)
throw ConcurrentModificationException()
}
}

assertIs<ConcurrentModificationException>(result.exceptionOrNull(), "Unexpected ${result.exceptionOrNull()}")

val token = bucket.lastTokenAcquired!!
assertTrue(token.nextToken!!.isFailure)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testTooLongFromResult() = runTest {
val options = StandardRetryStrategyOptions.Default.copy(maxTime = 1_000.milliseconds)
val bucket = RecordingTokenBucket()
val delayer = RecordingDelayer()
val retryer = StandardRetryStrategy(options, bucket, delayer)
val policy = StringRetryPolicy()

val result = runCatching {
retryer.retry(policy) {
delay(2_000)
"This will never run!"
}
}

val ex = assertIs<TimedOutException>(result.exceptionOrNull(), "Unexpected ${result.exceptionOrNull()}")
assertEquals(1, ex.attempts)
assertNull(ex.lastResponse)
assertNull(ex.lastException)

val token = bucket.lastTokenAcquired!!
assertTrue(token.isFailure)
}
}

fun block(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import kotlinx.coroutines.test.runTest
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlin.test.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

class StandardRetryIntegrationTest {
Expand All @@ -30,7 +29,7 @@ class StandardRetryIntegrationTest {
val testCases = standardRetryIntegrationTestCases
.mapValues { Yaml.default.decodeFromString(TestCase.serializer(), it.value) }
testCases.forEach { (name, tc) ->
val options = StandardRetryStrategyOptions(maxTime = Duration.INFINITE, maxAttempts = tc.given.maxAttempts)
val options = StandardRetryStrategyOptions(maxAttempts = tc.given.maxAttempts)
val tokenBucket = StandardRetryTokenBucket(
StandardRetryTokenBucketOptions.Default.copy(
maxCapacity = tc.given.initialRetryTokens,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,24 +263,9 @@ object KotlinClientRuntimeConfigProperty {
strategy.
""".trimIndent()

val retryStrategyBlock = """
run {
val strategyOptions = StandardRetryStrategyOptions.Default
val tokenBucket = StandardRetryTokenBucket(StandardRetryTokenBucketOptions.Default)
val delayer = ExponentialBackoffWithJitter(ExponentialBackoffWithJitterOptions.Default)
StandardRetryStrategy(strategyOptions, tokenBucket, delayer)
}
""".trimIndent()
propertyType = ClientConfigPropertyType.ConstantValue(retryStrategyBlock)

additionalImports = listOf(
RuntimeTypes.Core.Retries.StandardRetryStrategy,
RuntimeTypes.Core.Retries.StandardRetryStrategyOptions,
RuntimeTypes.Core.Retries.Delay.StandardRetryTokenBucket,
RuntimeTypes.Core.Retries.Delay.StandardRetryTokenBucketOptions,
RuntimeTypes.Core.Retries.Delay.ExponentialBackoffWithJitter,
RuntimeTypes.Core.Retries.Delay.ExponentialBackoffWithJitterOptions,
)
propertyType = ClientConfigPropertyType.ConstantValue("StandardRetryStrategy()")

additionalImports = listOf(RuntimeTypes.Core.Retries.StandardRetryStrategy)
}

SdkLogMode = ClientConfigProperty {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private fun KotlinWriter.renderRetryStrategy(wi: WaiterInfo, asValName: String)
}
write("val delay = ExponentialBackoffWithJitter(delayOptions)")
write("")
write("val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.#T, maxAttempts = 20)", KotlinTypes.Time.seconds)
write("val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)")
write("StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ class Config private constructor(builder: Builder): HttpClientConfig, Idempotenc
val endpointResolver: EndpointResolver = requireNotNull(builder.endpointResolver) { "endpointResolver is a required configuration property" }
override val httpClientEngine: HttpClientEngine? = builder.httpClientEngine
override val idempotencyTokenProvider: IdempotencyTokenProvider? = builder.idempotencyTokenProvider
val retryStrategy: RetryStrategy = run {
val strategyOptions = StandardRetryStrategyOptions.Default
val tokenBucket = StandardRetryTokenBucket(StandardRetryTokenBucketOptions.Default)
val delayer = ExponentialBackoffWithJitter(ExponentialBackoffWithJitterOptions.Default)
StandardRetryStrategy(strategyOptions, tokenBucket, delayer)
}
val retryStrategy: RetryStrategy = StandardRetryStrategy()
override val sdkLogMode: SdkLogMode = builder.sdkLogMode
"""
contents.shouldContainWithDiff(expectedProps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ServiceWaitersGeneratorTest {
)
val delay = ExponentialBackoffWithJitter(delayOptions)

val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.seconds, maxAttempts = 20)
val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)
StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)
}
""".formatForTest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class WaiterGeneratorTest {
)
val delay = ExponentialBackoffWithJitter(delayOptions)

val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.seconds, maxAttempts = 20)
val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)
StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)
}
""".formatForTest()
Expand All @@ -56,7 +56,7 @@ class WaiterGeneratorTest {
)
val delay = ExponentialBackoffWithJitter(delayOptions)

val waiterOptions = StandardRetryStrategyOptions(maxTime = 300.seconds, maxAttempts = 20)
val waiterOptions = StandardRetryStrategyOptions(maxAttempts = 20)
StandardRetryStrategy(waiterOptions, InfiniteTokenBucket, delay)
}
""".formatForTest()
Expand Down