Skip to content

Commit 8a94d6f

Browse files
authored
Add compile-time support for reading/writing simple streams (fixes #77) (#82)
- Implements compile-time/vm support for memory inputs/outputs - Changes writeText for floats so the output is the same across nim targets and versions.
1 parent a9c6b88 commit 8a94d6f

File tree

7 files changed

+502
-137
lines changed

7 files changed

+502
-137
lines changed

faststreams/inputs.nim

Lines changed: 174 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ type
7777
FileInputStream = ref object of InputStream
7878
file: File
7979

80+
VmInputStream = ref object of InputStream
81+
data: seq[byte]
82+
pos: int
83+
8084
template Sync*(s: InputStream): InputStream = s
8185

8286
when fsAsyncSupport:
@@ -119,6 +123,11 @@ when fsAsyncSupport:
119123
template makeHandle*(sp: InputStream): InputStreamHandle =
120124
InputStreamHandle(s: sp)
121125

126+
proc close(s: VmInputStream) =
127+
if s == nil:
128+
return
129+
s.pos = s.data.len
130+
122131
proc close*(s: InputStream,
123132
behavior = dontWaitAsyncClose)
124133
{.raises: [IOError].} =
@@ -128,18 +137,21 @@ proc close*(s: InputStream,
128137
## If the underlying input device requires asynchronous closing
129138
## and `behavior` is set to `waitAsyncClose`, this proc will use
130139
## `waitFor` to block until the async operation completes.
131-
if s == nil:
132-
return
140+
when nimvm:
141+
close(VmInputStream(s))
142+
else:
143+
if s == nil:
144+
return
133145

134-
s.disconnectInputDevice()
135-
s.preventFurtherReading()
136-
when fsAsyncSupport:
137-
if s.closeFut != nil:
138-
fsTranslateErrors "Stream closing failed":
139-
if behavior == waitAsyncClose:
140-
waitFor s.closeFut
141-
else:
142-
asyncCheck s.closeFut
146+
s.disconnectInputDevice()
147+
s.preventFurtherReading()
148+
when fsAsyncSupport:
149+
if s.closeFut != nil:
150+
fsTranslateErrors "Stream closing failed":
151+
if behavior == waitAsyncClose:
152+
waitFor s.closeFut
153+
else:
154+
asyncCheck s.closeFut
143155

144156
when fsAsyncSupport:
145157
template close*(sp: AsyncInputStream) =
@@ -266,9 +278,12 @@ func getNewSpanOrDieTrying(s: InputStream) =
266278
fsAssert s.span.hasRunway
267279

268280
func readableNow*(s: InputStream): bool =
269-
if s.span.hasRunway: return true
270-
getNewSpan s
271-
s.span.hasRunway
281+
when nimvm:
282+
VmInputStream(s).pos < VmInputStream(s).data.len
283+
else:
284+
if s.span.hasRunway: return true
285+
getNewSpan s
286+
s.span.hasRunway
272287

273288
when fsAsyncSupport:
274289
template readableNow*(s: AsyncInputStream): bool =
@@ -323,20 +338,26 @@ func getBestContiguousRunway(s: InputStream): Natural =
323338
getNewSpan s
324339
result = s.span.len
325340

341+
func totalUnconsumedBytes(s: VmInputStream): Natural =
342+
s.data.len - s.pos
343+
326344
func totalUnconsumedBytes*(s: InputStream): Natural =
327345
## Returns the number of bytes that are currently sitting within the stream
328346
## buffers and that can be consumed with `read` or `advance`.
329-
let
330-
localRunway = s.span.len
331-
runwayInBuffers =
332-
if s.maxBufferedBytes.isSome():
333-
s.maxBufferedBytes.get()
334-
elif s.buffers != nil:
335-
s.buffers.consumable()
336-
else:
337-
0
347+
when nimvm:
348+
totalUnconsumedBytes(VmInputStream(s))
349+
else:
350+
let
351+
localRunway = s.span.len
352+
runwayInBuffers =
353+
if s.maxBufferedBytes.isSome():
354+
s.maxBufferedBytes.get()
355+
elif s.buffers != nil:
356+
s.buffers.consumable()
357+
else:
358+
0
338359

339-
localRunway + runwayInBuffers
360+
localRunway + runwayInBuffers
340361

341362
proc prepareReadableRange(s: InputStream, rangeLen: Natural): auto =
342363
let
@@ -461,24 +482,34 @@ func unsafeMemoryInput*(mem: openArray[byte]): InputStreamHandle =
461482
## the instance even though scope-wise, it looks like it should stay alive.
462483
##
463484
## See also https://github.com/nim-lang/Nim/issues/25080
464-
let head = cast[ptr byte](mem)
485+
when nimvm:
486+
makeHandle VmInputStream(data: @mem, pos: 0)
487+
else:
488+
let head = cast[ptr byte](mem)
465489

466-
makeHandle InputStream(
467-
span: PageSpan(
468-
startAddr: head,
469-
endAddr: offset(head, mem.len)),
470-
spanEndPos: mem.len)
490+
makeHandle InputStream(
491+
span: PageSpan(
492+
startAddr: head,
493+
endAddr: offset(head, mem.len)),
494+
spanEndPos: mem.len)
471495

472496
func unsafeMemoryInput*(str: string): InputStreamHandle =
473497
unsafeMemoryInput str.toOpenArrayByte(0, str.len - 1)
474498

499+
proc len(s: VmInputStream): Option[Natural] =
500+
doAssert s.data.len - s.pos >= 0
501+
some(Natural(s.data.len - s.pos))
502+
475503
proc len*(s: InputStream): Option[Natural] {.raises: [IOError].} =
476-
if s.vtable == nil:
477-
some s.totalUnconsumedBytes
478-
elif s.vtable.getLenSync != nil:
479-
s.vtable.getLenSync(s)
504+
when nimvm:
505+
len(VmInputStream(s))
480506
else:
481-
none Natural
507+
if s.vtable == nil:
508+
some s.totalUnconsumedBytes
509+
elif s.vtable.getLenSync != nil:
510+
s.vtable.getLenSync(s)
511+
else:
512+
none Natural
482513

483514
when fsAsyncSupport:
484515
template len*(s: AsyncInputStream): Option[Natural] =
@@ -488,15 +519,18 @@ func memoryInput*(buffers: PageBuffers): InputStreamHandle =
488519
makeHandle InputStream(buffers: buffers)
489520

490521
func memoryInput*(data: openArray[byte]): InputStreamHandle =
491-
let stream = if data.len > 0:
492-
let buffers = PageBuffers.init(data.len)
493-
buffers.write(data)
494-
495-
InputStream(buffers: buffers)
522+
when nimvm:
523+
makeHandle VmInputStream(data: @data, pos: 0)
496524
else:
497-
InputStream()
525+
let stream = if data.len > 0:
526+
let buffers = PageBuffers.init(data.len)
527+
buffers.write(data)
528+
529+
InputStream(buffers: buffers)
530+
else:
531+
InputStream()
498532

499-
makeHandle stream
533+
makeHandle stream
500534

501535
func memoryInput*(data: openArray[char]): InputStreamHandle =
502536
memoryInput data.toOpenArrayByte(0, data.high())
@@ -598,8 +632,11 @@ template readable*(sp: InputStream): bool =
598632
# This is a template, because we want the pointer check to be
599633
# inlined at the call sites. Only if it fails, we call into the
600634
# larger non-inlined proc:
601-
let s = sp
602-
hasRunway(s.span) or bufferMoreDataSync(s)
635+
when nimvm:
636+
VmInputStream(sp).pos < VmInputStream(sp).data.len
637+
else:
638+
let s = sp
639+
hasRunway(s.span) or bufferMoreDataSync(s)
603640

604641
when fsAsyncSupport:
605642
template readable*(sp: AsyncInputStream): bool =
@@ -678,7 +715,10 @@ proc readable*(s: InputStream, n: int): bool =
678715
## Just like `readable`, this operation will invoke reads on the
679716
## stream input device only when necessary. See `Stream Pages`
680717
## for futher discussion of this.
681-
readableNImpl(s, n, noAwait, readSync)
718+
when nimvm:
719+
VmInputStream(s).pos + n <= VmInputStream(s).data.len
720+
else:
721+
readableNImpl(s, n, noAwait, readSync)
682722

683723
when fsAsyncSupport:
684724
template readable*(sp: AsyncInputStream, np: int): bool =
@@ -691,13 +731,20 @@ when fsAsyncSupport:
691731

692732
readableNImpl(s, n, fsAwait, readAsync)
693733

734+
template peek(s: VmInputStream): byte =
735+
doAssert s.pos < s.data.len
736+
s.data[s.pos]
737+
694738
template peek*(sp: InputStream): byte =
695-
let s = sp
696-
if hasRunway(s.span):
697-
s.span.startAddr[]
739+
when nimvm:
740+
peek(VmInputStream(sp))
698741
else:
699-
getNewSpanOrDieTrying s
700-
s.span.startAddr[]
742+
let s = sp
743+
if hasRunway(s.span):
744+
s.span.startAddr[]
745+
else:
746+
getNewSpanOrDieTrying s
747+
s.span.startAddr[]
701748

702749
when fsAsyncSupport:
703750
template peek*(s: AsyncInputStream): byte =
@@ -707,42 +754,64 @@ func readFromNewSpan(s: InputStream): byte =
707754
getNewSpanOrDieTrying s
708755
s.span.read()
709756

757+
template read(s: VmInputStream): byte =
758+
doAssert s.pos < s.data.len
759+
inc s.pos
760+
s.data[s.pos-1]
761+
710762
template read*(sp: InputStream): byte =
711-
let s = sp
712-
if hasRunway(s.span):
713-
s.span.read()
763+
when nimvm:
764+
read(VmInputStream(sp))
714765
else:
715-
readFromNewSpan s
766+
let s = sp
767+
if hasRunway(s.span):
768+
s.span.read()
769+
else:
770+
readFromNewSpan s
716771

717772
when fsAsyncSupport:
718773
template read*(s: AsyncInputStream): byte =
719774
read InputStream(s)
720775

776+
func peekAt(s: VmInputStream, pos: int): byte =
777+
doAssert s.pos + pos < s.data.len
778+
s.data[s.pos + pos]
779+
721780
func peekAt*(s: InputStream, pos: int): byte {.inline.} =
722-
let runway = s.span.len
723-
if pos < runway:
724-
let peekHead = offset(s.span.startAddr, pos)
725-
return peekHead[]
781+
when nimvm:
782+
return peekAt(VmInputStream(s), pos)
783+
else:
784+
let runway = s.span.len
785+
if pos < runway:
786+
let peekHead = offset(s.span.startAddr, pos)
787+
return peekHead[]
726788

727-
if s.buffers != nil:
728-
var p = pos - runway
729-
for page in s.buffers.queue:
730-
if p < page.len():
731-
return page.data()[p]
732-
p -= page.len()
789+
if s.buffers != nil:
790+
var p = pos - runway
791+
for page in s.buffers.queue:
792+
if p < page.len():
793+
return page.data()[p]
794+
p -= page.len()
733795

734-
fsAssert false,
735-
"peeking past readable position pos=" & $pos & " readable = " & $s.totalUnconsumedBytes()
796+
fsAssert false,
797+
"peeking past readable position pos=" & $pos & " readable = " & $s.totalUnconsumedBytes()
736798

737799
when fsAsyncSupport:
738800
template peekAt*(s: AsyncInputStream, pos: int): byte =
739801
peekAt InputStream(s), pos
740802

803+
func advance(s: VmInputStream) =
804+
doAssert s.pos < s.data.len
805+
inc s.pos
806+
741807
func advance*(s: InputStream) =
742-
if s.span.atEnd:
743-
getNewSpanOrDieTrying s
808+
when nimvm:
809+
advance(VmInputStream(s))
810+
else:
811+
if s.span.atEnd:
812+
getNewSpanOrDieTrying s
744813

745-
s.span.advance()
814+
s.span.advance()
746815

747816
func advance*(s: InputStream, n: Natural) =
748817
# TODO This is silly, implement it properly
@@ -813,19 +882,31 @@ template readIntoExImpl(s: InputStream,
813882

814883
dstLen - bytesDeficit
815884

885+
proc readIntoEx(s: VmInputStream, dst: var openArray[byte]): int =
886+
result = 0
887+
for i in 0 ..< dst.len:
888+
if s.pos >= s.data.len:
889+
break
890+
dst[i] = s.data[s.pos]
891+
inc s.pos
892+
inc result
893+
816894
proc readIntoEx*(s: InputStream, dst: var openArray[byte]): int =
817895
## Read data into the destination buffer.
818896
##
819897
## Returns the number of bytes that were successfully
820898
## written to the buffer. The function will return a
821899
## number smaller than the buffer length only if EOF
822900
## was reached before the buffer was fully populated.
823-
if dst.len > 0:
824-
let dstAddr = baseAddr dst
825-
let dstLen = dst.len
826-
readIntoExImpl(s, dstAddr, dstLen, noAwait, readSync)
901+
when nimvm:
902+
readIntoEx(VmInputStream(s), dst)
827903
else:
828-
0
904+
if dst.len > 0:
905+
let dstAddr = baseAddr dst
906+
let dstLen = dst.len
907+
readIntoExImpl(s, dstAddr, dstLen, noAwait, readSync)
908+
else:
909+
0
829910

830911
proc readInto*(s: InputStream, target: var openArray[byte]): bool =
831912
## Read data into the destination buffer.
@@ -900,15 +981,25 @@ template readNImpl(sp: InputStream,
900981

901982
makeOpenArray(startAddr, n)
902983

984+
template read(s: VmInputStream, n: Natural): openArray[byte] =
985+
doAssert s.pos + n - 1 <= s.data.len, "not enough data to read"
986+
toOpenArray(s.data, s.pos, s.pos + n - 1)
987+
903988
template read*(sp: InputStream, np: static Natural): openArray[byte] =
904-
const n = np
905-
when n < maxStackUsage:
906-
readNImpl(sp, n, MemAllocType.StackMem)
989+
when nimvm:
990+
read(VmInputStream(sp), np)
907991
else:
908-
readNImpl(sp, n, MemAllocType.HeapMem)
992+
const n = np
993+
when n < maxStackUsage:
994+
readNImpl(sp, n, MemAllocType.StackMem)
995+
else:
996+
readNImpl(sp, n, MemAllocType.HeapMem)
909997

910998
template read*(s: InputStream, n: Natural): openArray[byte] =
911-
readNImpl(s, n, MemAllocType.HeapMem)
999+
when nimvm:
1000+
read(VmInputStream(s), n)
1001+
else:
1002+
readNImpl(s, n, MemAllocType.HeapMem)
9121003

9131004
when fsAsyncSupport:
9141005
template read*(s: AsyncInputStream, n: Natural): openArray[byte] =
@@ -938,7 +1029,10 @@ when fsAsyncSupport:
9381029
none byte
9391030

9401031
func pos*(s: InputStream): int {.inline.} =
941-
s.spanEndPos - s.span.len
1032+
when nimvm:
1033+
VmInputStream(s).pos
1034+
else:
1035+
s.spanEndPos - s.span.len
9421036

9431037
when fsAsyncSupport:
9441038
template pos*(s: AsyncInputStream): int =

0 commit comments

Comments
 (0)