diff --git a/aws-spi-pekko-http/src/it/resources/application.conf b/aws-spi-pekko-http/src/it/resources/application.conf
index 8a2cb9651..a21b38f23 100644
--- a/aws-spi-pekko-http/src/it/resources/application.conf
+++ b/aws-spi-pekko-http/src/it/resources/application.conf
@@ -1,3 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
pekko.http.client.parsing.max-content-length = 15m
+pekko.http.client.log-unencrypted-network-bytes = 1000
+pekko.http.client.http2.log-frames = true
+pekko.loglevel = "DEBUG"
diff --git a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
index 348987b62..ae6d7ad47 100644
--- a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
+++ b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
@@ -31,25 +31,37 @@ class KinesisITTest extends AnyWordSpec with Matchers with TestBase {
def withClient(testCode: KinesisAsyncClient => Any): Any = {
- val pekkoClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+ // TODO use pekkoClientBuilder instead of httpClient in other withClient methods
+ val pekkoClientBuilder = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
val client = KinesisAsyncClient
.builder()
.credentialsProvider(credentialProviderChain)
.region(defaultRegion)
- .httpClient(pekkoClient)
+ .httpClientBuilder(pekkoClientBuilder)
.build()
try
testCode(client)
finally { // clean up
- pekkoClient.close()
client.close()
}
}
"Kinesis async client" should {
+ "list streams" in withClient { implicit client =>
+ val result = client.listStreams().join()
+ result.streamNames() should not be null
+ }
+ "list streams in parallel" in withClient { implicit client =>
+ // if the number of requests is changed from 5 to 6, then the test will be stuck for 60s and then complete correctly
+ val x = for (_ <- 1 to 5) yield {
+ client.listStreams()
+ }
+ x.foreach(_.join().streamNames() should not be null)
+ }
+
"use a data stream: create + put + get + delete" in withClient { implicit client =>
val streamName = "aws-spi-test-" + Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
val data = "123"
diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
index 9aacfd2b5..c0f29afed 100644
--- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
+++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
@@ -18,10 +18,9 @@
package org.apache.pekko.stream.connectors.awsspi
import java.util.concurrent.{ CompletableFuture, TimeUnit }
-
import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
-import pekko.http.scaladsl.Http
+import pekko.http.scaladsl._
import pekko.http.scaladsl.model.HttpHeader.ParsingResult
import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import pekko.http.scaladsl.model.MediaType.Compressible
@@ -29,43 +28,97 @@ import pekko.http.scaladsl.model.RequestEntityAcceptance.Expected
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
import pekko.http.scaladsl.settings.ConnectionPoolSettings
-import pekko.stream.scaladsl.Source
-import pekko.stream.{ Materializer, SystemMaterializer }
+import pekko.stream.scaladsl._
+import pekko.stream.{ Materializer, OverflowStrategy, SystemMaterializer }
import pekko.util.ByteString
-import pekko.util.OptionConverters
+import pekko.util.OptionConverters._
+import pekko.util.JavaDurationConverters._
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.async._
-import software.amazon.awssdk.http.SdkHttpRequest
+import software.amazon.awssdk.http.{ Protocol, SdkHttpConfigurationOption, SdkHttpRequest }
import software.amazon.awssdk.utils.AttributeMap
+import java.security.SecureRandom
+import java.security.cert.X509Certificate
+import javax.net.ssl._
import scala.collection.immutable
import scala.concurrent.duration.Duration
-import scala.concurrent.{ Await, ExecutionContext }
+import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
-class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit
+class PekkoHttpClient(
+ shutdownHandle: () => Unit,
+ protocol: HttpProtocol,
+ private[awsspi] val connectionSettings: ConnectionPoolSettings,
+ private[awsspi] val connectionContext: HttpsConnectionContext
+)(
+ implicit
actorSystem: ActorSystem,
ec: ExecutionContext,
mat: Materializer) extends SdkAsyncHttpClient {
import PekkoHttpClient._
- lazy val runner = new RequestRunner()
+ private lazy val runner = new RequestRunner()
+ private lazy val http2connectionFlows =
+ new java.util.concurrent.ConcurrentHashMap[Uri, SourceQueueWithComplete[HttpRequest]]()
override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = {
- val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher())
- runner.run(
- () => Http().singleRequest(pekkoHttpRequest, settings = connectionSettings),
- request.responseHandler())
+
+ logger.debug(s"Executing with protocol: $protocol")
+
+ if (protocol == HttpProtocols.`HTTP/2.0`) {
+ val useTls = request.request().protocol() == "https"
+ val akkaHttpRequest = toPekkoRequest(/*protocol, */ request.request(), request.requestContentPublisher())
+ val uri = akkaHttpRequest.effectiveUri(securedConnection = useTls)
+ val queue = http2connectionFlows.computeIfAbsent(uri,
+ _ => {
+ val baseConnection = Http()
+ .connectionTo(request.request().host())
+ .toPort(request.request().port())
+ .withCustomHttpsConnectionContext(connectionContext)
+ val http2client = request.request().protocol() match {
+ case "http" => baseConnection.managedPersistentHttp2WithPriorKnowledge()
+ case "https" => baseConnection.managedPersistentHttp2()
+ case _ => throw new IllegalArgumentException("Unsupported protocol")
+ }
+ Source
+ .queue[HttpRequest](4242, OverflowStrategy.fail)
+ .via(http2client)
+ .to(Sink.foreach { res =>
+ res.attribute(ResponsePromise.Key).get.promise.trySuccess(res)
+ })
+ .run()
+ })
+
+ val dispatch: HttpRequest => Future[HttpResponse] = req => {
+ val p = Promise[HttpResponse]()
+ queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))).flatMap(_ => p.future)
+ }
+
+ runner.run(
+ () => dispatch(akkaHttpRequest),
+ request.responseHandler()
+ )
+ } else {
+ runner.run(
+ () => {
+ val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher())
+ Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext)
+ },
+ request.responseHandler())
+ }
}
- override def close(): Unit =
+ override def close(): Unit = {
+ http2connectionFlows.values().iterator().forEachRemaining(_.complete())
shutdownHandle()
+ }
override def clientName(): String = "pekko-http"
}
object PekkoHttpClient {
- val logger = LoggerFactory.getLogger(this.getClass)
+ private val logger = LoggerFactory.getLogger(this.getClass)
private[awsspi] def toPekkoRequest(request: SdkHttpRequest,
contentPublisher: SdkHttpContentPublisher): HttpRequest = {
@@ -84,8 +137,7 @@ object PekkoHttpClient {
contentType: ContentType,
contentPublisher: SdkHttpContentPublisher): RequestEntity =
method.requestEntityAcceptance match {
- case Expected =>
- OptionConverters.toScala(contentPublisher.contentLength()) match {
+ case Expected => contentPublisher.contentLength().toScala match {
case Some(length) =>
HttpEntity(contentType, length, Source.fromPublisher(contentPublisher).map(ByteString(_)))
case None => HttpEntity(contentType, Source.fromPublisher(contentPublisher).map(ByteString(_)))
@@ -151,18 +203,49 @@ object PekkoHttpClient {
else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.")
}
+ private[awsspi] def buildConnectionPoolSettings(
+ base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = {
+ def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration =
+ if (duration.isZero) scala.concurrent.duration.Duration.Inf
+ else duration.asScala
+
+ base
+ .withUpdatedConnectionSettings(s =>
+ s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).asScala)
+ .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).asScala))
+ .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue())
+ .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)))
+ }
+
def builder() = PekkoHttpClientBuilder()
case class PekkoHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None,
private val executionContext: Option[ExecutionContext] = None,
- private val connectionPoolSettings: Option[ConnectionPoolSettings] = None)
+ private val connectionPoolSettings: Option[ConnectionPoolSettings] = None,
+ private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings =
+ (c, _) => c)
extends SdkAsyncHttpClient.Builder[PekkoHttpClientBuilder] {
- def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = {
+ def buildWithDefaults(serviceDefaults: AttributeMap): SdkAsyncHttpClient = {
implicit val as = actorSystem.getOrElse(ActorSystem("aws-pekko-http"))
implicit val ec = executionContext.getOrElse(as.dispatcher)
val mat: Materializer = SystemMaterializer(as).materializer
- val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as))
+ println("serviceDefaults: " + serviceDefaults)
+
+ val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS);
+
+ val protocol = toProtocol(resolvedOptions.get(SdkHttpConfigurationOption.PROTOCOL))
+
+ val cps = connectionPoolSettingsBuilder(
+ connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)),
+ resolvedOptions
+ )
+
+ val connectionContext =
+ if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
+ ConnectionContext.httpsClient(createInsecureSslEngine _)
+ else ConnectionContext.httpsClient(SSLContext.getDefault)
+
val shutdownhandleF = () => {
if (actorSystem.isEmpty) {
Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()),
@@ -170,8 +253,9 @@ object PekkoHttpClient {
}
()
}
- new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat)
+ new PekkoHttpClient(shutdownhandleF, protocol, cps, connectionContext)(as, ec, mat)
}
+
def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem))
def withActorSystem(actorSystem: ClassicActorSystemProvider): PekkoHttpClientBuilder =
copy(actorSystem = Some(actorSystem.classicSystem))
@@ -179,6 +263,12 @@ object PekkoHttpClient {
copy(executionContext = Some(executionContext))
def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): PekkoHttpClientBuilder =
copy(connectionPoolSettings = Some(connectionPoolSettings))
+ def withConnectionPoolSettingsBuilder(
+ connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings
+ ): PekkoHttpClientBuilder =
+ copy(connectionPoolSettingsBuilder = connectionPoolSettingsBuilder)
+ def withConnectionPoolSettingsBuilderFromAttributeMap(): PekkoHttpClientBuilder =
+ copy(connectionPoolSettingsBuilder = buildConnectionPoolSettings)
}
lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible))
@@ -195,4 +285,40 @@ object PekkoHttpClient {
"application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded,
"application/x-www-form-urlencoded" -> formUrlEncoded,
"application/xml" -> applicationXml)
+
+ private def toProtocol(protocol: Protocol): HttpProtocol = protocol match {
+ case Protocol.HTTP2 => HttpProtocols.`HTTP/2.0`
+ case Protocol.HTTP1_1 => HttpProtocols.`HTTP/1.1`
+ case _ => throw new IllegalArgumentException(s"Unsupported protocol: $protocol")
+ }
+
+ private def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
+ val engine = createTrustfulSslContext().createSSLEngine(host, port)
+ engine.setUseClientMode(true)
+
+ // WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
+ // Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
+ // When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with:
+ // engine.setSSLParameters({
+ // val params = engine.getSSLParameters
+ // params.setEndpointIdentificationAlgorithm("https")
+ // params
+ // })
+
+ engine
+ }
+
+ private def createTrustfulSslContext(): SSLContext = {
+ object NoCheckX509TrustManager extends X509TrustManager {
+ override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = ()
+
+ override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = ()
+
+ override def getAcceptedIssuers = Array[X509Certificate]()
+ }
+
+ val context = SSLContext.getInstance("TLS")
+ context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), new SecureRandom())
+ context
+ }
}
diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
index fa1335bce..ac01b0edb 100644
--- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
+++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
@@ -26,7 +26,6 @@ import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Keep, Sink }
import pekko.util.FutureConverters
-import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.SdkHttpFullResponse
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler
@@ -34,10 +33,9 @@ import scala.concurrent.{ ExecutionContext, Future }
class RequestRunner()(implicit ec: ExecutionContext, mat: Materializer) {
- val logger = LoggerFactory.getLogger(this.getClass)
-
def run(runRequest: () => Future[HttpResponse], handler: SdkAsyncHttpResponseHandler): CompletableFuture[Void] = {
- val result = runRequest().flatMap { response =>
+ // Future.unit.flatMap(expr) is a scala 2.12 equivalent of Future.delegate(expr)
+ val result = Future.unit.flatMap(_ => runRequest()).flatMap { response =>
handler.onHeaders(toSdkHttpFullResponse(response))
val (complete, publisher) = response.entity.dataBytes
diff --git a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java
similarity index 98%
rename from aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
rename to aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java
index 528b285dd..3261e21ea 100644
--- a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
+++ b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java
@@ -21,7 +21,6 @@
import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService;
import org.junit.Rule;
import org.junit.Test;
-import org.scalatestplus.junit.JUnitSuite;
import org.testcontainers.containers.GenericContainer;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
@@ -44,7 +43,7 @@
import static org.junit.Assert.assertEquals;
-public class S3Test extends JUnitSuite {
+public class S3Test {
private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static SecureRandom rnd = new SecureRandom();
diff --git a/aws-spi-pekko-http/src/test/resources/logback-test.xml b/aws-spi-pekko-http/src/test/resources/logback-test.xml
index 5d1d29de5..bf6018724 100644
--- a/aws-spi-pekko-http/src/test/resources/logback-test.xml
+++ b/aws-spi-pekko-http/src/test/resources/logback-test.xml
@@ -15,7 +15,7 @@
-
+
diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala
new file mode 100644
index 000000000..06148812b
--- /dev/null
+++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import software.amazon.awssdk.http.{ SdkAsyncHttpClientH1TestSuite, SdkHttpConfigurationOption }
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient
+import software.amazon.awssdk.utils.AttributeMap
+
+class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite {
+
+ override def setupClient(): SdkAsyncHttpClient = {
+ PekkoHttpClient.builder().buildWithDefaults(
+ AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.box(true)).build());
+ }
+
+ // Failed tests
+ // The logic to not reuse connections on server error status is not implemented in PekkoHttpClient, and
+ // it seems that it is being reverted in https://github.com/aws/aws-sdk-java-v2/pull/5607
+ override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = ()
+
+}
diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
index 6046aa1b6..3ae36ed21 100644
--- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
+++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -18,13 +18,20 @@
package org.apache.pekko.stream.connectors.awsspi
import java.util.Collections
+import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.http.scaladsl.model.headers.`Content-Type`
import pekko.http.scaladsl.model.MediaTypes
+import pekko.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
+import pekko.util.JavaDurationConverters._
import org.scalatest.OptionValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
+import software.amazon.awssdk.http.SdkHttpConfigurationOption
+import software.amazon.awssdk.utils.AttributeMap
+
+import scala.concurrent.duration._
class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues {
@@ -47,5 +54,86 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues {
contentTypeHeader.value.lowercaseName() shouldBe `Content-Type`.lowercaseName
reqHeaders should have size 1
}
+ "build() should use default ConnectionPoolSettings" in {
+ val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
+ .build()
+ .asInstanceOf[PekkoHttpClient]
+
+ pekkoClient.connectionSettings shouldBe ConnectionPoolSettings(ConfigFactory.load())
+ }
+
+ "withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in {
+ val attributeMap = AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, 1.second.asJava)
+ .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, 2.second.asJava)
+ .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, Integer.valueOf(3))
+ .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, 4.second.asJava)
+ .build()
+ val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
+ .withConnectionPoolSettingsBuilderFromAttributeMap()
+ .buildWithDefaults(attributeMap)
+ .asInstanceOf[PekkoHttpClient]
+
+ pekkoClient.connectionSettings.connectionSettings.connectingTimeout shouldBe 1.second
+ pekkoClient.connectionSettings.connectionSettings.idleTimeout shouldBe 2.seconds
+ pekkoClient.connectionSettings.maxConnections shouldBe 3
+ pekkoClient.connectionSettings.maxConnectionLifetime shouldBe 4.seconds
+ }
+
+ "withConnectionPoolSettingsBuilderFromAttributeMap().build() should fallback to GLOBAL_HTTP_DEFAULTS" in {
+ val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
+ .withConnectionPoolSettingsBuilderFromAttributeMap()
+ .build()
+ .asInstanceOf[PekkoHttpClient]
+
+ pekkoClient.connectionSettings.connectionSettings.connectingTimeout shouldBe
+ SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).asScala
+ pekkoClient.connectionSettings.connectionSettings.idleTimeout shouldBe
+ SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).asScala
+ pekkoClient.connectionSettings.maxConnections shouldBe
+ SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()
+ infiniteToZero(pekkoClient.connectionSettings.maxConnectionLifetime) shouldBe
+ SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)
+ }
+
+ "withConnectionPoolSettingsBuilder().build() should use passed connectionPoolSettings builder" in {
+ val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load())
+ .withConnectionSettings(
+ ClientConnectionSettings(ConfigFactory.load())
+ .withConnectingTimeout(1.second)
+ .withIdleTimeout(2.seconds)
+ )
+ .withMaxConnections(3)
+ .withMaxConnectionLifetime(4.seconds)
+
+ val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
+ .withConnectionPoolSettingsBuilder((_, _) => connectionPoolSettings)
+ .build()
+ .asInstanceOf[PekkoHttpClient]
+
+ pekkoClient.connectionSettings shouldBe connectionPoolSettings
+ }
+
+ "withConnectionPoolSettings().build() should use passed ConnectionPoolSettings" in {
+ val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load())
+ .withConnectionSettings(
+ ClientConnectionSettings(ConfigFactory.load())
+ .withConnectingTimeout(1.second)
+ .withIdleTimeout(2.seconds)
+ )
+ .withMaxConnections(3)
+ .withMaxConnectionLifetime(4.seconds)
+ val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
+ .withConnectionPoolSettings(connectionPoolSettings)
+ .build()
+ .asInstanceOf[PekkoHttpClient]
+
+ pekkoClient.connectionSettings shouldBe connectionPoolSettings
+ }
+ }
+
+ private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match {
+ case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO
+ case duration: FiniteDuration => duration.asJava
}
}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 137bb6b95..2d7ae2901 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -10,6 +10,7 @@
import sbt._
import Common.isScala3
import Keys._
+import com.github.sbt.junit.jupiter.sbt.Import.JupiterKeys
object Dependencies {
@@ -142,9 +143,11 @@ object Dependencies {
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
("software.amazon.awssdk" % "s3" % AwsSdk2Version % "it,test").excludeAll(
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+ ("software.amazon.awssdk" % "http-client-tests" % AwsSdk2Version % "it,test").excludeAll(
+ ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
"com.dimafeng" %% "testcontainers-scala" % TestContainersScalaTestVersion % Test,
+ "com.github.sbt.junit" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test,
"org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test",
- "org.scalatestplus" %% "junit-4-13" % scalaTestScalaCheckVersion % "it,test",
"ch.qos.logback" % "logback-classic" % LogbackVersion % "it,test"))
val AwsLambda = Seq(
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 37abc6f69..609ec7be5 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -31,3 +31,5 @@ addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.1.0-M1")
// templating
addSbtPlugin("com.github.sbt" % "sbt-boilerplate" % "0.7.0")
+// Run JUnit 5 tests with sbt
+addSbtPlugin("com.github.sbt.junit" % "sbt-jupiter-interface" % "0.13.0")