Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ language : scala

scala:
- 2.11.12
- 2.12.6
- 2.12.8
- 2.13.0

cache:
directories:
Expand Down
73 changes: 49 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,21 @@ lazy val contributors = Seq(

lazy val commonSettings = Seq(
organization := "com.spinoco",
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.11.12", "2.12.6"),
scalacOptions ++= Seq(
"-feature",
"-deprecation",
"-language:implicitConversions",
"-language:higherKinds",
"-language:existentials",
"-language:postfixOps",
"-Xfatal-warnings",
"-Yno-adapted-args",
"-Ywarn-value-discard",
"-Ywarn-unused-import"
),
scalaVersion := "2.13.0",
crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.0"),
scalacOptions ++= commonScalacOptions(scalaVersion.value),
scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)},
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value,
libraryDependencies ++= Seq(
compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
, "org.scodec" %% "scodec-bits" % "1.1.4"
, "org.scodec" %% "scodec-core" % "1.10.3"
, "com.spinoco" %% "protocol-http" % "0.3.15"
, "com.spinoco" %% "protocol-websocket" % "0.3.15"
, "co.fs2" %% "fs2-core" % "1.0.0"
, "co.fs2" %% "fs2-io" % "1.0.0"
, "com.spinoco" %% "fs2-crypto" % "0.4.0"
, "org.scalacheck" %% "scalacheck" % "1.13.4" % "test"
),
"org.scodec" %% "scodec-bits" % "1.1.12"
, "org.scodec" %% "scodec-core" % "1.11.4"
, "com.spinoco" %% "protocol-http" % "0.3.19-SNAPSHOT"
, "com.spinoco" %% "protocol-websocket" % "0.3.19-SNAPSHOT"
, "co.fs2" %% "fs2-core" % "1.1.0-M1"
, "co.fs2" %% "fs2-io" % "1.1.0-M1"
, "com.spinoco" %% "fs2-crypto" % "0.5.0-M1"
, "org.scalacheck" %% "scalacheck" % "1.14.0" % Test
) ++ macroDependencies(scalaVersion.value),
scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "[email protected]:Spinoco/fs2-http.git")),
homepage := None,
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
Expand Down Expand Up @@ -117,3 +105,40 @@ lazy val `fs2-http`=
)


def macroDependencies(scalaVersion: String) =
if (priorTo2_13(scalaVersion))
Seq(
compilerPlugin(("org.scalamacros" %% "paradise" % "2.1.1").cross(CrossVersion.patch))
)
else Seq.empty

def commonScalacOptions(scalaVersion: String) =
Seq(
"-encoding", "UTF-8",
"-feature",
"-deprecation",
"-language:existentials",
"-language:higherKinds",
"-language:implicitConversions",
"-language:postfixOps",
"-unchecked",
"-Ywarn-dead-code",
"-Ywarn-numeric-widen",
"-Ywarn-value-discard"
) ++ (if (priorTo2_13(scalaVersion))
Seq(
"-Ywarn-unused-import",
"-Yno-adapted-args",
"-Xfatal-warnings", // TODO: add the following two back to 2.13 (deprecation?)
)
else
Seq(
"-Ymacro-annotations"
))

def priorTo2_13(scalaVersion: String): Boolean =
CrossVersion.partialVersion(scalaVersion) match {
case Some((2, minor)) if minor < 13 => true
case _ => false
}

2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.6
sbt.version=1.2.8
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("com.github.tkawachi" % "sbt-doctest" % "0.8.0")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.9")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.11")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5")
11 changes: 5 additions & 6 deletions src/main/scala/spinoco/fs2/http/HttpClient.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package spinoco.fs2.http

import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.TimeUnit

import cats.Applicative
import javax.net.ssl.SSLContext
import cats.effect._
import fs2._
import fs2.concurrent.SignallingRef
import fs2.io.tcp.Socket
import fs2.io.tcp.{Socket, SocketGroup}
import scodec.{Codec, Decoder, Encoder}
import spinoco.fs2.http.internal.{addressForRequest, clientLiftToSecure, readWithTimeout}
import spinoco.fs2.http.sse.{SSEDecoder, SSEEncoding}
Expand Down Expand Up @@ -116,7 +115,7 @@ trait HttpClient[F[_]] {
, responseCodec : Codec[HttpResponseHeader]
, sslExecutionContext: => ExecutionContext
, sslContext : => SSLContext
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay {
)(implicit SG: SocketGroup):F[HttpClient[F]] = Sync[F].delay {
lazy val sslCtx = sslContext
lazy val sslS = sslExecutionContext

Expand All @@ -128,7 +127,7 @@ trait HttpClient[F[_]] {
, timeout: Duration
): Stream[F, HttpResponse[F]] = {
Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address =>
Stream.resource(io.tcp.client[F](address))
Stream.resource(SG.client[F](address))
.evalMap { socket =>
if (!request.isSecure) Applicative[F].pure(socket)
else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host)
Expand Down Expand Up @@ -174,7 +173,7 @@ trait HttpClient[F[_]] {
timeout match {
case fin: FiniteDuration =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent =>
val remains = fin - (sent - start).millis
Expand All @@ -186,7 +185,7 @@ trait HttpClient[F[_]] {
}}}}

case _ =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import header._
import spinoco.protocol.mime.{ContentType, MediaType}
import scodec.bits.ByteVector

import spinoco.fs2.http.internal
import spinoco.fs2.http.sse.{SSEEncoder, SSEEncoding}


Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/spinoco/fs2/http/HttpServer.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package spinoco.fs2.http

import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

import cats.effect.{ConcurrentEffect, Sync, Timer}
import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer}
import cats.syntax.all._
import fs2._
import fs2.concurrent.SignallingRef
import fs2.io.tcp.SocketGroup
import scodec.Codec
import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec}
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader, HttpStatusCode}
Expand All @@ -19,7 +19,7 @@ object HttpServer {
/**
* Creates simple http server,
*
* Serve will run after the resulting stream is run.
* Server will run after the resulting stream is run.
*
* @param bindTo Address and port where to bind server to
* @param maxConcurrent Maximum requests to process concurrently
Expand All @@ -36,7 +36,7 @@ object HttpServer {
* Request is not suplied if failure happened before request was constructed.
*
*/
def apply[F[_] : ConcurrentEffect : Timer](
def apply[F[_] : ConcurrentEffect : ContextShift : Timer](
maxConcurrent: Int = Int.MaxValue
, receiveBufferSize: Int = 256 * 1024
, maxHeaderSize: Int = 10 *1024
Expand All @@ -49,16 +49,16 @@ object HttpServer {
, sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing]
)(
implicit
AG: AsynchronousChannelGroup
SG: SocketGroup
): Stream[F, Unit] = {
import Stream._
import internal._
import spinoco.fs2.http.internal._
val (initial, readDuration) = requestHeaderReceiveTimeout match {
case fin: FiniteDuration => (true, fin)
case _ => (false, 0.millis)
}

io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
SG.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
Stream.resource(resource).flatMap { socket =>
eval(SignallingRef(initial)).flatMap { timeoutSignal =>
readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize)
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/spinoco/fs2/http/http.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package spinoco.fs2

import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors

import javax.net.ssl.SSLContext
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import fs2._
import fs2.io.tcp.SocketGroup
import scodec.Codec
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader}
import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec}
Expand All @@ -30,7 +30,7 @@ package object http {
* Request will fail, if the header won't be read within this timeout.
* @param service Pipe that defines handling of each incoming request and produces a response
*/
def server[F[_] : ConcurrentEffect : Timer](
def server[F[_] : ConcurrentEffect : ContextShift : Timer](
bindTo: InetSocketAddress
, maxConcurrent: Int = Int.MaxValue
, receiveBufferSize: Int = 256 * 1024
Expand All @@ -40,7 +40,7 @@ package object http {
, responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec
)(
service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]]
)(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer(
)(implicit SG: SocketGroup):Stream[F,Unit] = HttpServer(
maxConcurrent = maxConcurrent
, receiveBufferSize = receiveBufferSize
, maxHeaderSize = maxHeaderSize
Expand All @@ -66,7 +66,7 @@ package object http {
, responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec
, sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true)))
, sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx }
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] =
)(implicit SG: SocketGroup):F[HttpClient[F]] =
HttpClient(requestCodec, responseCodec, sslStrategy, sslContext)

}
2 changes: 1 addition & 1 deletion src/main/scala/spinoco/fs2/http/internal/internal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ package object internal {
eval(shallTimeout).flatMap { shallTimeout =>
if (!shallTimeout) socket.reads(chunkSize, None)
else {
if (remains <= 0.millis) Stream.raiseError(new TimeoutException())
if (remains <= 0.millis) Stream.raiseError[F](new TimeoutException())
else {
eval(Sync[F].delay(System.currentTimeMillis())).flatMap { start =>
eval(socket.read(chunkSize, Some(remains))).flatMap { read =>
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package spinoco.fs2.http.websocket


import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors

import cats.Applicative
Expand All @@ -10,6 +9,7 @@ import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer}
import fs2.Chunk.ByteVectorChunk
import fs2._
import fs2.concurrent.Queue
import fs2.io.tcp.SocketGroup
import scodec.Attempt.{Failure, Successful}
import scodec.bits.ByteVector
import scodec.{Codec, Decoder, Encoder}
Expand Down Expand Up @@ -91,21 +91,21 @@ object WebSocket {
, responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec
, sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true)))
, sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx }
)(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = {
)(implicit SG: SocketGroup): Stream[F, Option[HttpResponseHeader]] = {
import spinoco.fs2.http.internal._
import Stream._
eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address =>
Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize))
Stream.resource(SG.client[F](address, receiveBufferSize = receiveBufferSize))
.evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) }
.flatMap { socket =>
val (header, fingerprint) = impl.createRequestHeaders(request.header)
requestCodec.encode(header) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode websocket request: $err"))
case Failure(err) => Stream.raiseError[F](new Throwable(s"Failed to encode websocket request: $err"))
case Successful(headerBits) =>
eval(socket.write(ByteVectorChunk(headerBits.bytes ++ `\r\n\r\n`))).flatMap { _ =>
socket.reads(receiveBufferSize) through httpHeaderAndBody(maxHeaderSize) flatMap { case (respHeaderBytes, body) =>
responseCodec.decodeValue(respHeaderBytes.bits) match {
case Failure(err) => raiseError(new Throwable(s"Failed to decode websocket response: $err"))
case Failure(err) => raiseError[F](new Throwable(s"Failed to decode websocket response: $err"))
case Successful(responseHeader) =>
impl.validateResponse[F](header, responseHeader, fingerprint).flatMap {
case Some(resp) => emit(Some(resp))
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/spinoco/fs2/http/HttpServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object HttpServerSpec extends Properties("HttpServer"){
if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO]
else {
val ct = request.headers.collectFirst { case `Content-Type`(ct0) => ct0 }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None))
val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l)
val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L)
val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size)

Stream.emit(ok.copy(body = body.take(size)))
Expand Down
9 changes: 5 additions & 4 deletions src/test/scala/spinoco/fs2/http/Resources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package spinoco.fs2.http
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors

import cats.effect.{Concurrent, ContextShift, IO, Timer}
import cats.effect.{Blocker, Concurrent, ContextShift, IO, Timer}
import fs2.io.tcp.SocketGroup

import scala.concurrent.ExecutionContext

Expand All @@ -13,7 +14,7 @@ object Resources {
implicit val _cxs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global)
implicit val _timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global)
implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_cxs)
implicit val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-spec-AG", daemon = true)))


lazy val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-spec-AG", daemon = true)))
lazy val blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.global)
implicit val SG: SocketGroup = new SocketGroup(AG, blocker)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object HttpServerApp extends App {
if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO]
else {
val ct = request.headers.collectFirst { case `Content-Type`(ct) => ct }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None))
val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l)
val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L)
val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size)

Stream.emit(ok.copy(body = body.take(size)))
Expand Down