Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rt): add support for HTTP proxies #665

Merged
merged 18 commits into from
Jun 20, 2022
Merged
8 changes: 8 additions & 0 deletions .changes/32235989-e284-4cec-aafc-0f73b941cfa1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "32235989-e284-4cec-aafc-0f73b941cfa1",
"type": "feature",
"description": "Add support for HTTP proxies",
"issues": [
"awslabs/smithy-kotlin#494"
]
}
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kotestVersion=5.3.0
kotlinCompileTestingVersion=1.4.8
jacocoVersion=0.8.8
kotlinxBenchmarkVersion=0.4.2
testContainersVersion=1.17.2

# serialization
kamlVersion=0.36.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ internal abstract class AbstractBufferedReadChannel(

val segment = newReadableSegment(bytesIn)
val result = segments.trySend(segment)

// nothing to do, channel is closed no more data is expected
if (result.isClosed) return

check(result.isSuccess) { "failed to queue segment" }

// advertise bytes available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import aws.smithy.kotlin.runtime.client.ExecutionContext
import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
import aws.smithy.kotlin.runtime.http.engine.callContext
import aws.smithy.kotlin.runtime.http.operation.withContext
import aws.smithy.kotlin.runtime.http.request.HttpRequest
Expand Down Expand Up @@ -76,7 +77,9 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE
override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
val callContext = callContext()
val reqLogger = logger.withContext(context)
val manager = getManagerForUri(request.uri)

val proxyConfig = config.proxySelector.select(request.url)
val manager = getManagerForUri(request.uri, proxyConfig)

// LIFETIME: connection will be released back to the pool/manager when
// the response completes OR on exception (both handled by the completion handler registered on the stream
Expand Down Expand Up @@ -110,9 +113,22 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE
customTlsContext?.close()
}

private suspend fun getManagerForUri(uri: Uri): HttpClientConnectionManager = mutex.withLock {
private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock {
connManagers.getOrPut(uri.authority) {
HttpClientConnectionManager(options.apply { this.uri = uri }.build())
val connOpts = options.apply {
this.uri = uri
proxyOptions = when (proxyConfig) {
is ProxyConfig.Http -> HttpProxyOptions(
proxyConfig.url.host,
proxyConfig.url.port,
authUsername = proxyConfig.url.userInfo?.username,
authPassword = proxyConfig.url.userInfo?.password,
authType = if (proxyConfig.url.userInfo != null) HttpProxyAuthorizationType.Basic else HttpProxyAuthorizationType.None
)
else -> null
}
}.build()
HttpClientConnectionManager(connOpts)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal class HttpEngineEventListener(
}

override fun proxySelectEnd(call: Call, url: HttpUrl, proxies: List<Proxy>) {
traceCall(call) { "proxy select end: url=$url" }
traceCall(call) { "proxy select end: url=$url; proxies=$proxies" }
}

override fun requestBodyStart(call: Call) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,8 @@ private fun OkHttpEngineConfig.buildClient(): OkHttpClient {
}
protocols(protocols)
}

proxySelector(OkHttpProxySelector(config.proxySelector))
proxyAuthenticator(OkHttpProxyAuthenticator(config.proxySelector))
}.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.client.ExecutionContext
import aws.smithy.kotlin.runtime.http.HttpBody
import aws.smithy.kotlin.runtime.http.HttpStatusCode
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.response.HttpResponse
import aws.smithy.kotlin.runtime.io.SdkByteChannel
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.logging.Logger
import kotlinx.coroutines.*
import okhttp3.Authenticator
import okhttp3.Credentials
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Route
import okhttp3.internal.http.HttpMethod
import java.io.IOException
import java.net.*
import kotlin.coroutines.CoroutineContext
import aws.smithy.kotlin.runtime.http.engine.ProxySelector as SdkProxySelector
import okhttp3.Request as OkHttpRequest
import okhttp3.Response as OkHttpResponse

Expand Down Expand Up @@ -112,3 +119,59 @@ internal fun CoroutineContext.derivedName(name: String): CoroutineName {
val existing = get(CoroutineName)?.name ?: return CoroutineName(name)
return CoroutineName("$existing:$name")
}

internal class OkHttpProxyAuthenticator(
private val selector: SdkProxySelector,
) : Authenticator {
override fun authenticate(route: Route?, response: okhttp3.Response): okhttp3.Request? {
if (response.request.header("Proxy-Authorization") != null) {
// Give up, we've already failed to authenticate.
return null
}

val url = response.request.url.let {
Url(scheme = Protocol(it.scheme, it.port), host = it.host, port = it.port)
}

// NOTE: We will end up querying the proxy selector twice. We do this to allow
// the url.userInfo be used for Basic auth scheme. Supporting other auth schemes
// will require defining dedicated proxy auth configuration APIs that work
// on a per/request basis (much like the okhttp interface we are implementing here...)
val userInfo = when (val proxyConfig = selector.select(url)) {
is ProxyConfig.Http -> proxyConfig.url.userInfo
else -> null
} ?: return null

for (challenge in response.challenges()) {
if (challenge.scheme.lowercase() == "okhttp-preemptive" || challenge.scheme == "Basic") {
return response.request.newBuilder()
.header("Proxy-Authorization", Credentials.basic(userInfo.username, userInfo.password))
.build()
}
}

return null
}
}

internal class OkHttpProxySelector(
private val sdkSelector: SdkProxySelector
) : ProxySelector() {
override fun select(uri: URI?): List<Proxy> {
if (uri == null) return emptyList()
val url = uri.toUrl()

return when (val proxyConfig = sdkSelector.select(url)) {
is ProxyConfig.Http -> {
val okProxy = Proxy(Proxy.Type.HTTP, InetSocketAddress(proxyConfig.url.host, proxyConfig.url.port))
return listOf(okProxy)
}
else -> emptyList()
}
}

override fun connectFailed(uri: URI?, sa: SocketAddress?, ioe: IOException?) {
val logger = Logger.getLogger<OkHttpProxySelector>()
logger.error { "failed to connect to proxy: uri=$uri; socketAddress: $sa; exception: $ioe" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extra["skipPublish"] = true
val coroutinesVersion: String by project
val ktorVersion: String by project
val slf4jVersion: String by project
val testContainersVersion: String by project

kotlin {
sourceSets {
Expand Down Expand Up @@ -40,6 +41,13 @@ kotlin {
}
}

jvmTest {
dependencies {
implementation("org.testcontainers:testcontainers:$testContainersVersion")
implementation("org.testcontainers:junit-jupiter:$testContainersVersion")
}
}

all {
languageSettings.optIn("aws.smithy.kotlin.runtime.util.InternalApi")
}
Expand Down Expand Up @@ -99,6 +107,13 @@ val testTasks = listOf("allTests", "jvmTest")
}
}

tasks.jvmTest {
// set test environment for proxy tests
systemProperty("MITM_PROXY_SCRIPTS_ROOT", projectDir.resolve("proxy-scripts").absolutePath)
val enableProxyTestsProp = "aws.test.http.enableProxyTests"
systemProperty(enableProxyTestsProp, System.getProperties().getOrDefault(enableProxyTestsProp, "true"))
}

gradle.buildFinished {
startTestServer.stop()
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package aws.smithy.kotlin.runtime.http.test.util
import aws.smithy.kotlin.runtime.http.SdkHttpClient
import aws.smithy.kotlin.runtime.http.Url
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.request.HttpRequestBuilder
import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.sdkHttpClient
Expand All @@ -24,28 +25,32 @@ private val TEST_SERVER = Url.parse("http://127.0.0.1:8082")
/**
* Abstract base class that all engine test suite test classes should inherit from.
*/
abstract class AbstractEngineTest() {
abstract class AbstractEngineTest {

/**
* Build a test that will run against each engine in the test suite.
*
* Concrete implementations for each KMP target are responsible for loading the engines
* supported by that platform and executing the test
*/
fun testEngines(block: EngineTestBuilder.() -> Unit) {
engines().forEach { engine ->
sdkHttpClient(engine, manageEngine = true).use { client ->
testWithClient(client, block = block)
fun testEngines(skipEngines: Set<String> = emptySet(), block: EngineTestBuilder.() -> Unit) {
val builder = EngineTestBuilder().apply(block)
engineFactories()
.filter { it.name !in skipEngines }
.forEach { engineFactory ->
val engine = engineFactory.create(builder.engineConfig)
sdkHttpClient(engine, manageEngine = true).use { client ->
testWithClient(client, builder = builder)
}
}
}
}
}

/**
* Concrete implementations for each KMP target are responsible for loading the engines
* supported by that platform and executing the test
*/
internal expect fun engines(): List<HttpClientEngine>
internal expect fun engineFactories(): List<TestEngineFactory>

/**
* Container for current engine test environment
Expand All @@ -60,10 +65,16 @@ data class TestEnvironment(val testServer: Url, val coroutineId: Int, val attemp
* Configure the test
*/
class EngineTestBuilder {
/**
* Lambda function invoked to configure the [HttpClientEngineConfig] to use for the test. If not specified
* [HttpClientEngineConfig.Default] is used
*/
var engineConfig: HttpClientEngineConfig.Builder.() -> Unit = {}

/**
* Lambda function that is invoked with the current test environment and an [SdkHttpClient]
* configured with an engine loaded by [AbstractEngineTest]. Invoke calls against test routes and make
* assertions here
* assertions here. This will potentially be invoked multiple times (once for each engine supported by a platform).
*/
var test: (suspend (env: TestEnvironment, client: SdkHttpClient) -> Unit) = { _, _ -> error("engine test not configured") }

Expand All @@ -84,9 +95,8 @@ class EngineTestBuilder {
fun testWithClient(
client: SdkHttpClient,
timeout: Duration = 60.seconds,
block: suspend EngineTestBuilder.() -> Unit
builder: EngineTestBuilder
): Unit = runBlockingTest(timeout = timeout) {
val builder = EngineTestBuilder().apply { block() }
runConcurrently(builder.concurrency) { coroutineId ->
repeat(builder.repeat) { attempt ->
val env = TestEnvironment(TEST_SERVER, coroutineId, attempt)
Expand Down Expand Up @@ -121,3 +131,20 @@ fun HttpRequestBuilder.testSetup(env: TestEnvironment) {
url(env.testServer)
headers.append("Host", "${env.testServer.host}:${env.testServer.port}")
}

fun EngineTestBuilder.engineConfig(block: HttpClientEngineConfig.Builder.() -> Unit) {
engineConfig = block
}

internal data class TestEngineFactory(
/**
* Unique name for the engine
*/
val name: String,
/**
* Configure a new [HttpClientEngine] instance and return it
*/
val configure: (HttpClientEngineConfig.Builder.() -> Unit) -> HttpClientEngine
) {
fun create(block: HttpClientEngineConfig.Builder.() -> Unit): HttpClientEngine = configure(block)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package aws.smithy.kotlin.runtime.http.test
import aws.smithy.kotlin.runtime.http.HttpStatusCode
import aws.smithy.kotlin.runtime.http.readAll
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.response.complete
import aws.smithy.kotlin.runtime.http.test.util.AbstractEngineTest
import aws.smithy.kotlin.runtime.http.test.util.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import aws.smithy.kotlin.runtime.http.HttpStatusCode
import aws.smithy.kotlin.runtime.http.content.ByteArrayContent
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.request.headers
import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.response.complete
import aws.smithy.kotlin.runtime.http.test.util.AbstractEngineTest
import aws.smithy.kotlin.runtime.http.test.util.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
package aws.smithy.kotlin.runtime.http.test.util

import aws.smithy.kotlin.runtime.http.engine.DefaultHttpEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.crt.CrtHttpEngine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration

internal actual fun engines(): List<HttpClientEngine> =
internal actual fun engineFactories(): List<TestEngineFactory> =
listOf(
DefaultHttpEngine(),
CrtHttpEngine(),
KtorOkHttpEngine()
TestEngineFactory("DefaultHttpEngine", ::DefaultHttpEngine),
TestEngineFactory("CrtHttpEngine") { CrtHttpEngine(it) },
TestEngineFactory("KtorEngine") { KtorOkHttpEngine(it) }
)

internal actual fun runBlockingTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import aws.smithy.kotlin.runtime.http.engine.AlpnId
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.engine.ktor.KtorEngine
import io.ktor.client.engine.*
import io.ktor.client.engine.okhttp.*
import okhttp3.ConnectionPool
import okhttp3.Protocol
import java.util.concurrent.TimeUnit
import kotlin.time.toJavaDuration

// Example KtorEngine that can be used to verify the implementation against the test suite
internal fun KtorOkHttpEngine(config: HttpClientEngineConfig = HttpClientEngineConfig.Default): HttpClientEngine {
val okHttpEngine = OkHttp.create {
config {
Expand Down Expand Up @@ -43,3 +45,9 @@ internal fun KtorOkHttpEngine(config: HttpClientEngineConfig = HttpClientEngineC

return KtorEngine(okHttpEngine)
}

internal fun KtorOkHttpEngine(block: HttpClientEngineConfig.Builder.() -> Unit): HttpClientEngine {
val builder = HttpClientEngineConfig.Builder().apply(block)
val config = HttpClientEngineConfig(builder)
return KtorOkHttpEngine(config)
}
Loading