Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dom/src/main/scala/org/scalajs/dom/ReadableStream.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.annotation.JSGlobal

/** defined at [[https://streams.spec.whatwg.org/#readable-stream ¶2.1. Readable Streams]] of whatwg Streams spec.
*
* @tparam T
* Type of the Chunks returned by the Stream. Can't make it coveriant, due to T
*/
@js.native
trait ReadableStream[+T] extends js.Object {
@JSGlobal
class ReadableStream[+T](
underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]],
queuingStrategy: js.UndefOr[ReadableStreamQueuingStrategy[T]] = js.undefined
) extends js.Object {

/** The locked getter returns whether or not the readable stream is locked to a reader.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.annotation._

/** [[https://streams.spec.whatwg.org/#rs-controller-class ¶3.3 Class ReadableStreamController]] of whatwg spec
*
Expand All @@ -14,8 +13,7 @@ import scala.scalajs.js.annotation._
* Type of the Chunks to be enqueued to the Stream
*/
@js.native
@JSGlobal
class ReadableStreamController[-T](stream: ReadableStream[T] = null) extends js.Object {
trait ReadableStreamController[-T] extends js.Object {

/** The desiredSize getter returns the desired size to fill the controlled stream’s internal queue. It can be
* negative, if the queue is over-full. An underlying source should use this information to determine when and how to
Expand All @@ -39,7 +37,7 @@ class ReadableStreamController[-T](stream: ReadableStream[T] = null) extends js.
* @return
* seems like its an undefOr[Int] of the size
*/
def enqueue(chunk: Chunk[T]): js.UndefOr[Int] = js.native
def enqueue(chunk: T): js.UndefOr[Int] = js.native

/** The error method will error the readable stream, making all future interactions with it fail with the given error
* e.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.scalajs.dom

import scala.scalajs.js

/** See [[https://streams.spec.whatwg.org/#qs-api ¶7.1. The queuing strategy API]]
*
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait ReadableStreamQueuingStrategy[T] extends js.Object {

/** A non-negative number indicating the high water mark of the stream using this queuing strategy. */
var highWaterMark: Double

/** (non-byte streams only)
*
* The result is used to determine backpressure, manifesting via the appropriate desiredSize property. For readable
* streams, it also governs when the underlying source's [[ReadableStreamUnderlyingSource.pull]] method is called.
*
* A function that computes and returns the finite non-negative size of the given chunk value.
*/
var size: js.Function1[Chunk[T], Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ class ReadableStreamReader[+T](stream: ReadableStream[T]) extends js.Object {
*
* If the reader is active, the cancel method behaves the same as that for the associated stream. When done, it
* automatically releases the lock.
*
* //todo determine type of reason
*/
// not actually sure what the return type is here
def cancel(reason: Any): js.Promise[Any] = js.native
def cancel[U](reason: js.UndefOr[U]): js.Promise[U] = js.native

/** See [[https://streams.spec.whatwg.org/#reader-read 3.4.4.3. read()]] of whatwg Stream spec.
*
Expand Down
11 changes: 11 additions & 0 deletions dom/src/main/scala/org/scalajs/dom/ReadableStreamType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.scalajs.dom

import scala.scalajs.js

/** [[https://streams.spec.whatwg.org/#enumdef-readablestreamtype ReadableStreamType enum]] */
@js.native
sealed trait ReadableStreamType extends js.Any

object ReadableStreamType {
val bytes: ReadableStreamType = "bytes".asInstanceOf[ReadableStreamType]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.|

/** See [[https://streams.spec.whatwg.org/#underlying-source-api ¶4.2.3. The underlying source API]] of whatwg streams
* spec.
*
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait ReadableStreamUnderlyingSource[T] extends js.Object {

/** A function that is called immediately during creation of the ReadableStream.
*
* If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise
* will error the stream. Any thrown exceptions will be re-thrown by the [[ReadableStream]] constructor.
*/
var start: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]] = js.undefined

/** A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the
* queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high
* water mark (i.e. until the desired size becomes non-positive).
*
* This function will not be called until [[start]] successfully completes. Additionally, it will only be called
* repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-op [[pull]] implementation will not
* be continually called.
*
* If the function returns a promise, then it will not be called again until that promise fulfills. (If the promise
* rejects, the stream will become errored.) This is mainly used in the case of pull sources, where the promise
* returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a
* rejected promise.
*/
var pull: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]] = js.undefined

/** A function that is called whenever the consumer cancels the stream, via [[ReadableStream.cancel]] or
* [[ReadableStreamReader.cancel]]. It takes as its argument the same value as was passed to those methods by the
* consumer.
*
* If the shutdown process is asynchronous, it can return a promise to signal success or failure; the result will be
* communicated via the return value of the [[cancel]] method that was called. Additionally, a rejected promise will
* error the stream, instead of letting it close. Throwing an exception is treated the same as returning a rejected
* promise.
*/
var cancel: js.UndefOr[js.Function1[js.Any, Unit | js.Promise[Unit]]] = js.undefined

/** Can be set to "bytes" to signal that the constructed [[ReadableStream]] is a readable byte stream.
*
* Setting any value other than "bytes" or undefined will cause the ReadableStream() constructor to throw an
* exception.
*/
var `type`: js.UndefOr[ReadableStreamType] = js.undefined

/** (byte streams only)
*
* Can be set to a positive integer to cause the implementation to automatically allocate buffers for the underlying
* source code to write into.
*/
var autoAllocateChunkSize: js.UndefOr[Double] = js.undefined
}
4 changes: 2 additions & 2 deletions dom/src/main/scala/org/scalajs/dom/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.scalajs

import scala.scalajs.js
import scala.scalajs.js.annotation._
import scala.scalajs.js.typedarray.{ArrayBuffer, ArrayBufferView}
import scala.scalajs.js.typedarray.{ArrayBuffer, ArrayBufferView, Uint8Array}
import scala.scalajs.js.|

package object dom {
Expand Down Expand Up @@ -32,7 +32,7 @@ package object dom {

/** defined at [[https://fetch.spec.whatwg.org/#body-mixin ¶6.2 Body mixin]] in whatwg Fetch spec */
type BodyInit =
Blob | BufferSource | FormData | String // todo: add URLSearchParams
Blob | BufferSource | FormData | String | ReadableStream[Uint8Array] // todo: add URLSearchParams

/** WebIDL sequence<T> is js.Array[T] | JSIterable[T]. However @mseddon knows at least Blink's IDL compiler treats
* these as simply js.Array[T] for now. We keep this type as a reminder to check in more detail
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package org.scalajs.dom.tests.shared

import org.scalajs.dom.tests.shared.AsyncTesting._
import org.junit.Assert.assertEquals
import org.junit.Test
import org.scalajs.dom.tests.shared.AsyncTesting._

import scala.concurrent.Future
import scala.scalajs.js
import scala.scalajs.js.Thenable.Implicits._
import scala.scalajs.js.|

trait SharedTests {

Expand Down Expand Up @@ -36,4 +42,42 @@ trait SharedTests {

@Test final def WindowIdbTest(): AsyncResult =
IdbTest(window.indexedDB)

@Test
final def ReadableStreamTest: AsyncResult = async {
case class Tuna(color: String)

val expectedTunas = Seq(
Tuna("blue"),
Tuna("red")
)

val stream = new ReadableStream[Tuna](
new ReadableStreamUnderlyingSource[Tuna] {
start = { controller: ReadableStreamController[Tuna] =>
controller.enqueue(Tuna("blue"))
controller.enqueue(Tuna("red"))
controller.close()
}.asInstanceOf[js.UndefOr[js.Function1[ReadableStreamController[Tuna], Unit | js.Promise[Unit]]]]
}
)

val reader = stream.getReader()

def read(tunas: Seq[Tuna]): Future[Seq[Tuna]] = {
reader
.read()
.flatMap { chunk =>
if (chunk.done) {
Future.successful(tunas)
} else {
read(tunas :+ chunk.value)
}
}
}
read(Seq.empty)
.map { receivedTunas =>
assertEquals(receivedTunas, expectedTunas)
}
}
}