Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 174 additions & 80 deletions faststreams/inputs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type
FileInputStream = ref object of InputStream
file: File

VmInputStream = ref object of InputStream
data: seq[byte]
pos: int

template Sync*(s: InputStream): InputStream = s

when fsAsyncSupport:
Expand Down Expand Up @@ -119,6 +123,11 @@ when fsAsyncSupport:
template makeHandle*(sp: InputStream): InputStreamHandle =
InputStreamHandle(s: sp)

proc close(s: VmInputStream) =
if s == nil:
return
s.pos = s.data.len

proc close*(s: InputStream,
behavior = dontWaitAsyncClose)
{.raises: [IOError].} =
Expand All @@ -128,18 +137,21 @@ proc close*(s: InputStream,
## If the underlying input device requires asynchronous closing
## and `behavior` is set to `waitAsyncClose`, this proc will use
## `waitFor` to block until the async operation completes.
if s == nil:
return
when nimvm:
close(VmInputStream(s))
else:
if s == nil:
return

s.disconnectInputDevice()
s.preventFurtherReading()
when fsAsyncSupport:
if s.closeFut != nil:
fsTranslateErrors "Stream closing failed":
if behavior == waitAsyncClose:
waitFor s.closeFut
else:
asyncCheck s.closeFut
s.disconnectInputDevice()
s.preventFurtherReading()
when fsAsyncSupport:
if s.closeFut != nil:
fsTranslateErrors "Stream closing failed":
if behavior == waitAsyncClose:
waitFor s.closeFut
else:
asyncCheck s.closeFut

when fsAsyncSupport:
template close*(sp: AsyncInputStream) =
Expand Down Expand Up @@ -266,9 +278,12 @@ func getNewSpanOrDieTrying(s: InputStream) =
fsAssert s.span.hasRunway

func readableNow*(s: InputStream): bool =
if s.span.hasRunway: return true
getNewSpan s
s.span.hasRunway
when nimvm:
VmInputStream(s).pos < VmInputStream(s).data.len
else:
if s.span.hasRunway: return true
getNewSpan s
s.span.hasRunway

when fsAsyncSupport:
template readableNow*(s: AsyncInputStream): bool =
Expand Down Expand Up @@ -323,20 +338,26 @@ func getBestContiguousRunway(s: InputStream): Natural =
getNewSpan s
result = s.span.len

func totalUnconsumedBytes(s: VmInputStream): Natural =
s.data.len - s.pos

func totalUnconsumedBytes*(s: InputStream): Natural =
## Returns the number of bytes that are currently sitting within the stream
## buffers and that can be consumed with `read` or `advance`.
let
localRunway = s.span.len
runwayInBuffers =
if s.maxBufferedBytes.isSome():
s.maxBufferedBytes.get()
elif s.buffers != nil:
s.buffers.consumable()
else:
0
when nimvm:
totalUnconsumedBytes(VmInputStream(s))
else:
let
localRunway = s.span.len
runwayInBuffers =
if s.maxBufferedBytes.isSome():
s.maxBufferedBytes.get()
elif s.buffers != nil:
s.buffers.consumable()
else:
0

localRunway + runwayInBuffers
localRunway + runwayInBuffers

proc prepareReadableRange(s: InputStream, rangeLen: Natural): auto =
let
Expand Down Expand Up @@ -461,24 +482,34 @@ func unsafeMemoryInput*(mem: openArray[byte]): InputStreamHandle =
## the instance even though scope-wise, it looks like it should stay alive.
##
## See also https://github.com/nim-lang/Nim/issues/25080
let head = cast[ptr byte](mem)
when nimvm:
makeHandle VmInputStream(data: @mem, pos: 0)
else:
let head = cast[ptr byte](mem)

makeHandle InputStream(
span: PageSpan(
startAddr: head,
endAddr: offset(head, mem.len)),
spanEndPos: mem.len)
makeHandle InputStream(
span: PageSpan(
startAddr: head,
endAddr: offset(head, mem.len)),
spanEndPos: mem.len)

func unsafeMemoryInput*(str: string): InputStreamHandle =
unsafeMemoryInput str.toOpenArrayByte(0, str.len - 1)

proc len(s: VmInputStream): Option[Natural] =
doAssert s.data.len - s.pos >= 0
some(Natural(s.data.len - s.pos))

proc len*(s: InputStream): Option[Natural] {.raises: [IOError].} =
if s.vtable == nil:
some s.totalUnconsumedBytes
elif s.vtable.getLenSync != nil:
s.vtable.getLenSync(s)
when nimvm:
len(VmInputStream(s))
else:
none Natural
if s.vtable == nil:
some s.totalUnconsumedBytes
elif s.vtable.getLenSync != nil:
s.vtable.getLenSync(s)
else:
none Natural

when fsAsyncSupport:
template len*(s: AsyncInputStream): Option[Natural] =
Expand All @@ -488,15 +519,18 @@ func memoryInput*(buffers: PageBuffers): InputStreamHandle =
makeHandle InputStream(buffers: buffers)

func memoryInput*(data: openArray[byte]): InputStreamHandle =
let stream = if data.len > 0:
let buffers = PageBuffers.init(data.len)
buffers.write(data)

InputStream(buffers: buffers)
when nimvm:
makeHandle VmInputStream(data: @data, pos: 0)
else:
InputStream()
let stream = if data.len > 0:
let buffers = PageBuffers.init(data.len)
buffers.write(data)

InputStream(buffers: buffers)
else:
InputStream()

makeHandle stream
makeHandle stream

func memoryInput*(data: openArray[char]): InputStreamHandle =
memoryInput data.toOpenArrayByte(0, data.high())
Expand Down Expand Up @@ -598,8 +632,11 @@ template readable*(sp: InputStream): bool =
# This is a template, because we want the pointer check to be
# inlined at the call sites. Only if it fails, we call into the
# larger non-inlined proc:
let s = sp
hasRunway(s.span) or bufferMoreDataSync(s)
when nimvm:
VmInputStream(sp).pos < VmInputStream(sp).data.len
else:
let s = sp
hasRunway(s.span) or bufferMoreDataSync(s)

when fsAsyncSupport:
template readable*(sp: AsyncInputStream): bool =
Expand Down Expand Up @@ -678,7 +715,10 @@ proc readable*(s: InputStream, n: int): bool =
## Just like `readable`, this operation will invoke reads on the
## stream input device only when necessary. See `Stream Pages`
## for futher discussion of this.
readableNImpl(s, n, noAwait, readSync)
when nimvm:
VmInputStream(s).pos + n <= VmInputStream(s).data.len
else:
readableNImpl(s, n, noAwait, readSync)

when fsAsyncSupport:
template readable*(sp: AsyncInputStream, np: int): bool =
Expand All @@ -691,13 +731,20 @@ when fsAsyncSupport:

readableNImpl(s, n, fsAwait, readAsync)

template peek(s: VmInputStream): byte =
doAssert s.pos < s.data.len
s.data[s.pos]

template peek*(sp: InputStream): byte =
let s = sp
if hasRunway(s.span):
s.span.startAddr[]
when nimvm:
peek(VmInputStream(sp))
else:
getNewSpanOrDieTrying s
s.span.startAddr[]
let s = sp
if hasRunway(s.span):
s.span.startAddr[]
else:
getNewSpanOrDieTrying s
s.span.startAddr[]

when fsAsyncSupport:
template peek*(s: AsyncInputStream): byte =
Expand All @@ -707,42 +754,64 @@ func readFromNewSpan(s: InputStream): byte =
getNewSpanOrDieTrying s
s.span.read()

template read(s: VmInputStream): byte =
doAssert s.pos < s.data.len
inc s.pos
s.data[s.pos-1]

template read*(sp: InputStream): byte =
let s = sp
if hasRunway(s.span):
s.span.read()
when nimvm:
read(VmInputStream(sp))
else:
readFromNewSpan s
let s = sp
if hasRunway(s.span):
s.span.read()
else:
readFromNewSpan s

when fsAsyncSupport:
template read*(s: AsyncInputStream): byte =
read InputStream(s)

func peekAt(s: VmInputStream, pos: int): byte =
doAssert s.pos + pos < s.data.len
s.data[s.pos + pos]

func peekAt*(s: InputStream, pos: int): byte {.inline.} =
let runway = s.span.len
if pos < runway:
let peekHead = offset(s.span.startAddr, pos)
return peekHead[]
when nimvm:
return peekAt(VmInputStream(s), pos)
else:
let runway = s.span.len
if pos < runway:
let peekHead = offset(s.span.startAddr, pos)
return peekHead[]

if s.buffers != nil:
var p = pos - runway
for page in s.buffers.queue:
if p < page.len():
return page.data()[p]
p -= page.len()
if s.buffers != nil:
var p = pos - runway
for page in s.buffers.queue:
if p < page.len():
return page.data()[p]
p -= page.len()

fsAssert false,
"peeking past readable position pos=" & $pos & " readable = " & $s.totalUnconsumedBytes()
fsAssert false,
"peeking past readable position pos=" & $pos & " readable = " & $s.totalUnconsumedBytes()

when fsAsyncSupport:
template peekAt*(s: AsyncInputStream, pos: int): byte =
peekAt InputStream(s), pos

func advance(s: VmInputStream) =
doAssert s.pos < s.data.len
inc s.pos

func advance*(s: InputStream) =
if s.span.atEnd:
getNewSpanOrDieTrying s
when nimvm:
advance(VmInputStream(s))
else:
if s.span.atEnd:
getNewSpanOrDieTrying s

s.span.advance()
s.span.advance()

func advance*(s: InputStream, n: Natural) =
# TODO This is silly, implement it properly
Expand Down Expand Up @@ -813,19 +882,31 @@ template readIntoExImpl(s: InputStream,

dstLen - bytesDeficit

proc readIntoEx(s: VmInputStream, dst: var openArray[byte]): int =
result = 0
for i in 0 ..< dst.len:
if s.pos >= s.data.len:
break
dst[i] = s.data[s.pos]
inc s.pos
inc result

proc readIntoEx*(s: InputStream, dst: var openArray[byte]): int =
## Read data into the destination buffer.
##
## Returns the number of bytes that were successfully
## written to the buffer. The function will return a
## number smaller than the buffer length only if EOF
## was reached before the buffer was fully populated.
if dst.len > 0:
let dstAddr = baseAddr dst
let dstLen = dst.len
readIntoExImpl(s, dstAddr, dstLen, noAwait, readSync)
when nimvm:
readIntoEx(VmInputStream(s), dst)
else:
0
if dst.len > 0:
let dstAddr = baseAddr dst
let dstLen = dst.len
readIntoExImpl(s, dstAddr, dstLen, noAwait, readSync)
else:
0

proc readInto*(s: InputStream, target: var openArray[byte]): bool =
## Read data into the destination buffer.
Expand Down Expand Up @@ -900,15 +981,25 @@ template readNImpl(sp: InputStream,

makeOpenArray(startAddr, n)

template read(s: VmInputStream, n: Natural): openArray[byte] =
doAssert s.pos + n - 1 <= s.data.len, "not enough data to read"
toOpenArray(s.data, s.pos, s.pos + n - 1)

template read*(sp: InputStream, np: static Natural): openArray[byte] =
const n = np
when n < maxStackUsage:
readNImpl(sp, n, MemAllocType.StackMem)
when nimvm:
read(VmInputStream(sp), np)
else:
readNImpl(sp, n, MemAllocType.HeapMem)
const n = np
when n < maxStackUsage:
readNImpl(sp, n, MemAllocType.StackMem)
else:
readNImpl(sp, n, MemAllocType.HeapMem)

template read*(s: InputStream, n: Natural): openArray[byte] =
readNImpl(s, n, MemAllocType.HeapMem)
when nimvm:
read(VmInputStream(s), n)
else:
readNImpl(s, n, MemAllocType.HeapMem)

when fsAsyncSupport:
template read*(s: AsyncInputStream, n: Natural): openArray[byte] =
Expand Down Expand Up @@ -938,7 +1029,10 @@ when fsAsyncSupport:
none byte

func pos*(s: InputStream): int {.inline.} =
s.spanEndPos - s.span.len
when nimvm:
VmInputStream(s).pos
else:
s.spanEndPos - s.span.len

when fsAsyncSupport:
template pos*(s: AsyncInputStream): int =
Expand Down
Loading
Loading