From 0ab10697cadeb54514da1d240ca78d04e0368b13 Mon Sep 17 00:00:00 2001 From: Alex Berezovskiy Date: Wed, 10 Jul 2019 22:54:38 +0300 Subject: [PATCH 1/5] Update sbt and plugins --- project/build.properties | 2 +- project/plugins.sbt | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/project/build.properties b/project/build.properties index d6e3507..c0bab04 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.1.6 +sbt.version=1.2.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index cdf6ae7..8ab2696 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ addSbtPlugin("com.github.tkawachi" % "sbt-doctest" % "0.8.0") -addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.9") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3") \ No newline at end of file +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.11") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5") From 5d6f4593c47c5a9614343590f3114651f18411d0 Mon Sep 17 00:00:00 2001 From: Alex Berezovskiy Date: Wed, 10 Jul 2019 22:56:39 +0300 Subject: [PATCH 2/5] Update Scala 2.12 to 2.12.8 --- .travis.yml | 2 +- build.sbt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2d6873e..51d7592 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language : scala scala: - 2.11.12 - - 2.12.6 + - 2.12.8 cache: directories: diff --git a/build.sbt b/build.sbt index a1ed552..f7fdfbc 100644 --- a/build.sbt +++ b/build.sbt @@ -9,8 +9,8 @@ lazy val contributors = Seq( lazy val commonSettings = Seq( organization := "com.spinoco", - scalaVersion := "2.12.6", - crossScalaVersions := Seq("2.11.12", "2.12.6"), + scalaVersion := "2.12.8", + crossScalaVersions := Seq("2.11.12", "2.12.8"), scalacOptions ++= Seq( "-feature", "-deprecation", From 3acb0250406ef327e276ecf904e6e89518fa12df Mon Sep 17 00:00:00 2001 From: Alex Berezovskiy Date: Thu, 11 Jul 2019 01:07:25 +0300 Subject: [PATCH 3/5] Fix typo --- src/main/scala/spinoco/fs2/http/HttpServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index 11d9b27..b40dbdf 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -19,7 +19,7 @@ object HttpServer { /** * Creates simple http server, * - * Serve will run after the resulting stream is run. + * Server will run after the resulting stream is run. * * @param bindTo Address and port where to bind server to * @param maxConcurrent Maximum requests to process concurrently From b66956ae25695c8264f6cdb9b2ecdd7801dd5d0d Mon Sep 17 00:00:00 2001 From: Alex Berezovskiy Date: Thu, 11 Jul 2019 01:08:55 +0300 Subject: [PATCH 4/5] Update dependencies --- build.sbt | 16 ++++++++-------- src/main/scala/spinoco/fs2/http/HttpClient.scala | 6 +++--- src/main/scala/spinoco/fs2/http/HttpServer.scala | 6 +++--- src/main/scala/spinoco/fs2/http/http.scala | 2 +- .../spinoco/fs2/http/websocket/WebSocket.scala | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/build.sbt b/build.sbt index f7fdfbc..55f97a9 100644 --- a/build.sbt +++ b/build.sbt @@ -26,15 +26,15 @@ lazy val commonSettings = Seq( scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)}, scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value, libraryDependencies ++= Seq( - compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full) - , "org.scodec" %% "scodec-bits" % "1.1.4" - , "org.scodec" %% "scodec-core" % "1.10.3" - , "com.spinoco" %% "protocol-http" % "0.3.15" - , "com.spinoco" %% "protocol-websocket" % "0.3.15" - , "co.fs2" %% "fs2-core" % "1.0.0" - , "co.fs2" %% "fs2-io" % "1.0.0" + compilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full) + , "org.scodec" %% "scodec-bits" % "1.1.12" + , "org.scodec" %% "scodec-core" % "1.11.4" + , "com.spinoco" %% "protocol-http" % "0.3.17" + , "com.spinoco" %% "protocol-websocket" % "0.3.17" + , "co.fs2" %% "fs2-core" % "1.0.5" + , "co.fs2" %% "fs2-io" % "1.0.5" , "com.spinoco" %% "fs2-crypto" % "0.4.0" - , "org.scalacheck" %% "scalacheck" % "1.13.4" % "test" + , "org.scalacheck" %% "scalacheck" % "1.14.0" % Test ), scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")), homepage := None, diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 7b75ad9..3cf1801 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -128,7 +128,7 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - Stream.resource(io.tcp.client[F](address)) + Stream.resource(Socket.client[F](address)) .evalMap { socket => if (!request.isSecure) Applicative[F].pure(socket) else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) @@ -174,7 +174,7 @@ trait HttpClient[F[_]] { timeout match { case fin: FiniteDuration => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start => - HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent => val remains = fin - (sent - start).millis @@ -186,7 +186,7 @@ trait HttpClient[F[_]] { }}}} case _ => - HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec) } } diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index b40dbdf..f2d4cf2 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -3,7 +3,7 @@ package spinoco.fs2.http import java.net.InetSocketAddress import java.nio.channels.AsynchronousChannelGroup -import cats.effect.{ConcurrentEffect, Sync, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef @@ -36,7 +36,7 @@ object HttpServer { * Request is not suplied if failure happened before request was constructed. * */ - def apply[F[_] : ConcurrentEffect : Timer]( + def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 , maxHeaderSize: Int = 10 *1024 @@ -58,7 +58,7 @@ object HttpServer { case _ => (false, 0.millis) } - io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + io.tcp.Socket.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => eval(SignallingRef(initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 94624ec..728d8b1 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -30,7 +30,7 @@ package object http { * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response */ - def server[F[_] : ConcurrentEffect : Timer]( + def server[F[_] : ConcurrentEffect : ContextShift : Timer]( bindTo: InetSocketAddress , maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index 38dfaec..a5828e5 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -95,7 +95,7 @@ object WebSocket { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize)) + Stream.resource(io.tcp.Socket.client[F](address, receiveBufferSize = receiveBufferSize)) .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) From 84aa287878258e105936f84277382d5181b4f6c9 Mon Sep 17 00:00:00 2001 From: Alex Berezovskiy Date: Thu, 11 Jul 2019 16:10:37 +0300 Subject: [PATCH 5/5] Add support for Scala 2.13 --- .travis.yml | 1 + build.sbt | 69 +++++++++++++------ .../scala/spinoco/fs2/http/HttpClient.scala | 7 +- .../fs2/http/HttpRequestOrResponse.scala | 1 + .../scala/spinoco/fs2/http/HttpServer.scala | 8 +-- src/main/scala/spinoco/fs2/http/http.scala | 6 +- .../spinoco/fs2/http/internal/internal.scala | 2 +- .../fs2/http/websocket/WebSocket.scala | 10 +-- .../spinoco/fs2/http/HttpServerSpec.scala | 2 +- .../scala/spinoco/fs2/http/Resources.scala | 9 +-- .../fs2/http/internal/HttpServerApp.scala | 2 +- 11 files changed, 72 insertions(+), 45 deletions(-) diff --git a/.travis.yml b/.travis.yml index 51d7592..e36a3cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ language : scala scala: - 2.11.12 - 2.12.8 + - 2.13.0 cache: directories: diff --git a/build.sbt b/build.sbt index 55f97a9..c82511c 100644 --- a/build.sbt +++ b/build.sbt @@ -9,33 +9,21 @@ lazy val contributors = Seq( lazy val commonSettings = Seq( organization := "com.spinoco", - scalaVersion := "2.12.8", - crossScalaVersions := Seq("2.11.12", "2.12.8"), - scalacOptions ++= Seq( - "-feature", - "-deprecation", - "-language:implicitConversions", - "-language:higherKinds", - "-language:existentials", - "-language:postfixOps", - "-Xfatal-warnings", - "-Yno-adapted-args", - "-Ywarn-value-discard", - "-Ywarn-unused-import" - ), + scalaVersion := "2.13.0", + crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.0"), + scalacOptions ++= commonScalacOptions(scalaVersion.value), scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)}, scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value, libraryDependencies ++= Seq( - compilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full) - , "org.scodec" %% "scodec-bits" % "1.1.12" + "org.scodec" %% "scodec-bits" % "1.1.12" , "org.scodec" %% "scodec-core" % "1.11.4" - , "com.spinoco" %% "protocol-http" % "0.3.17" - , "com.spinoco" %% "protocol-websocket" % "0.3.17" - , "co.fs2" %% "fs2-core" % "1.0.5" - , "co.fs2" %% "fs2-io" % "1.0.5" - , "com.spinoco" %% "fs2-crypto" % "0.4.0" + , "com.spinoco" %% "protocol-http" % "0.3.19-SNAPSHOT" + , "com.spinoco" %% "protocol-websocket" % "0.3.19-SNAPSHOT" + , "co.fs2" %% "fs2-core" % "1.1.0-M1" + , "co.fs2" %% "fs2-io" % "1.1.0-M1" + , "com.spinoco" %% "fs2-crypto" % "0.5.0-M1" , "org.scalacheck" %% "scalacheck" % "1.14.0" % Test - ), + ) ++ macroDependencies(scalaVersion.value), scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")), homepage := None, licenses += ("MIT", url("http://opensource.org/licenses/MIT")), @@ -117,3 +105,40 @@ lazy val `fs2-http`= ) +def macroDependencies(scalaVersion: String) = + if (priorTo2_13(scalaVersion)) + Seq( + compilerPlugin(("org.scalamacros" %% "paradise" % "2.1.1").cross(CrossVersion.patch)) + ) + else Seq.empty + +def commonScalacOptions(scalaVersion: String) = + Seq( + "-encoding", "UTF-8", + "-feature", + "-deprecation", + "-language:existentials", + "-language:higherKinds", + "-language:implicitConversions", + "-language:postfixOps", + "-unchecked", + "-Ywarn-dead-code", + "-Ywarn-numeric-widen", + "-Ywarn-value-discard" + ) ++ (if (priorTo2_13(scalaVersion)) + Seq( + "-Ywarn-unused-import", + "-Yno-adapted-args", + "-Xfatal-warnings", // TODO: add the following two back to 2.13 (deprecation?) + ) + else + Seq( + "-Ymacro-annotations" + )) + +def priorTo2_13(scalaVersion: String): Boolean = + CrossVersion.partialVersion(scalaVersion) match { + case Some((2, minor)) if minor < 13 => true + case _ => false + } + diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 3cf1801..c904e94 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -1,6 +1,5 @@ package spinoco.fs2.http -import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.TimeUnit import cats.Applicative @@ -8,7 +7,7 @@ import javax.net.ssl.SSLContext import cats.effect._ import fs2._ import fs2.concurrent.SignallingRef -import fs2.io.tcp.Socket +import fs2.io.tcp.{Socket, SocketGroup} import scodec.{Codec, Decoder, Encoder} import spinoco.fs2.http.internal.{addressForRequest, clientLiftToSecure, readWithTimeout} import spinoco.fs2.http.sse.{SSEDecoder, SSEEncoding} @@ -116,7 +115,7 @@ trait HttpClient[F[_]] { , responseCodec : Codec[HttpResponseHeader] , sslExecutionContext: => ExecutionContext , sslContext : => SSLContext - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay { + )(implicit SG: SocketGroup):F[HttpClient[F]] = Sync[F].delay { lazy val sslCtx = sslContext lazy val sslS = sslExecutionContext @@ -128,7 +127,7 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - Stream.resource(Socket.client[F](address)) + Stream.resource(SG.client[F](address)) .evalMap { socket => if (!request.isSecure) Applicative[F].pure(socket) else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) diff --git a/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala b/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala index 11066c4..baae793 100644 --- a/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala +++ b/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala @@ -13,6 +13,7 @@ import header._ import spinoco.protocol.mime.{ContentType, MediaType} import scodec.bits.ByteVector +import spinoco.fs2.http.internal import spinoco.fs2.http.sse.{SSEEncoder, SSEEncoding} diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index f2d4cf2..27ccb92 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -1,12 +1,12 @@ package spinoco.fs2.http import java.net.InetSocketAddress -import java.nio.channels.AsynchronousChannelGroup import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef +import fs2.io.tcp.SocketGroup import scodec.Codec import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader, HttpStatusCode} @@ -49,16 +49,16 @@ object HttpServer { , sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing] )( implicit - AG: AsynchronousChannelGroup + SG: SocketGroup ): Stream[F, Unit] = { import Stream._ - import internal._ + import spinoco.fs2.http.internal._ val (initial, readDuration) = requestHeaderReceiveTimeout match { case fin: FiniteDuration => (true, fin) case _ => (false, 0.millis) } - io.tcp.Socket.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + SG.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => eval(SignallingRef(initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 728d8b1..a814490 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -1,12 +1,12 @@ package spinoco.fs2 import java.net.InetSocketAddress -import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors import javax.net.ssl.SSLContext import cats.effect.{ConcurrentEffect, ContextShift, Timer} import fs2._ +import fs2.io.tcp.SocketGroup import scodec.Codec import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} @@ -40,7 +40,7 @@ package object http { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] - )(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer( + )(implicit SG: SocketGroup):Stream[F,Unit] = HttpServer( maxConcurrent = maxConcurrent , receiveBufferSize = receiveBufferSize , maxHeaderSize = maxHeaderSize @@ -66,7 +66,7 @@ package object http { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = + )(implicit SG: SocketGroup):F[HttpClient[F]] = HttpClient(requestCodec, responseCodec, sslStrategy, sslContext) } diff --git a/src/main/scala/spinoco/fs2/http/internal/internal.scala b/src/main/scala/spinoco/fs2/http/internal/internal.scala index 2dd9bda..9be32ee 100644 --- a/src/main/scala/spinoco/fs2/http/internal/internal.scala +++ b/src/main/scala/spinoco/fs2/http/internal/internal.scala @@ -106,7 +106,7 @@ package object internal { eval(shallTimeout).flatMap { shallTimeout => if (!shallTimeout) socket.reads(chunkSize, None) else { - if (remains <= 0.millis) Stream.raiseError(new TimeoutException()) + if (remains <= 0.millis) Stream.raiseError[F](new TimeoutException()) else { eval(Sync[F].delay(System.currentTimeMillis())).flatMap { start => eval(socket.read(chunkSize, Some(remains))).flatMap { read => diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index a5828e5..dd5057b 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -1,7 +1,6 @@ package spinoco.fs2.http.websocket -import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors import cats.Applicative @@ -10,6 +9,7 @@ import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} import fs2.Chunk.ByteVectorChunk import fs2._ import fs2.concurrent.Queue +import fs2.io.tcp.SocketGroup import scodec.Attempt.{Failure, Successful} import scodec.bits.ByteVector import scodec.{Codec, Decoder, Encoder} @@ -91,21 +91,21 @@ object WebSocket { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = { + )(implicit SG: SocketGroup): Stream[F, Option[HttpResponseHeader]] = { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - Stream.resource(io.tcp.Socket.client[F](address, receiveBufferSize = receiveBufferSize)) + Stream.resource(SG.client[F](address, receiveBufferSize = receiveBufferSize)) .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) requestCodec.encode(header) match { - case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode websocket request: $err")) + case Failure(err) => Stream.raiseError[F](new Throwable(s"Failed to encode websocket request: $err")) case Successful(headerBits) => eval(socket.write(ByteVectorChunk(headerBits.bytes ++ `\r\n\r\n`))).flatMap { _ => socket.reads(receiveBufferSize) through httpHeaderAndBody(maxHeaderSize) flatMap { case (respHeaderBytes, body) => responseCodec.decodeValue(respHeaderBytes.bits) match { - case Failure(err) => raiseError(new Throwable(s"Failed to decode websocket response: $err")) + case Failure(err) => raiseError[F](new Throwable(s"Failed to decode websocket response: $err")) case Successful(responseHeader) => impl.validateResponse[F](header, responseHeader, fingerprint).flatMap { case Some(resp) => emit(Some(resp)) diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 0880d70..55315c1 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -23,7 +23,7 @@ object HttpServerSpec extends Properties("HttpServer"){ if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] else { val ct = request.headers.collectFirst { case `Content-Type`(ct0) => ct0 }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) - val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l) + val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L) val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size) Stream.emit(ok.copy(body = body.take(size))) diff --git a/src/test/scala/spinoco/fs2/http/Resources.scala b/src/test/scala/spinoco/fs2/http/Resources.scala index 744f677..5084966 100644 --- a/src/test/scala/spinoco/fs2/http/Resources.scala +++ b/src/test/scala/spinoco/fs2/http/Resources.scala @@ -3,7 +3,8 @@ package spinoco.fs2.http import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors -import cats.effect.{Concurrent, ContextShift, IO, Timer} +import cats.effect.{Blocker, Concurrent, ContextShift, IO, Timer} +import fs2.io.tcp.SocketGroup import scala.concurrent.ExecutionContext @@ -13,7 +14,7 @@ object Resources { implicit val _cxs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) implicit val _timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global) implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_cxs) - implicit val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-spec-AG", daemon = true))) - - + lazy val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-spec-AG", daemon = true))) + lazy val blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.global) + implicit val SG: SocketGroup = new SocketGroup(AG, blocker) } diff --git a/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala b/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala index 1c9399a..664829f 100644 --- a/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala +++ b/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala @@ -19,7 +19,7 @@ object HttpServerApp extends App { if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] else { val ct = request.headers.collectFirst { case `Content-Type`(ct) => ct }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) - val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l) + val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L) val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size) Stream.emit(ok.copy(body = body.take(size)))