Skip to content

Commit a19dbdf

Browse files
committed
adds directory downloader
1 parent d10c094 commit a19dbdf

File tree

1 file changed

+239
-0
lines changed

1 file changed

+239
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
## Nim-Codex
2+
## Copyright (c) 2025 Status Research & Development GmbH
3+
## Licensed under either of
4+
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
5+
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
6+
## at your option.
7+
## This file may not be copied, modified, or distributed except according to
8+
## those terms.
9+
10+
{.push raises: [].}
11+
12+
import std/os
13+
import std/times
14+
import std/strutils
15+
import std/sequtils
16+
import std/sugar
17+
import pkg/chronos
18+
import pkg/libp2p/[cid, multihash]
19+
import pkg/libp2p/stream/lpstream
20+
import pkg/questionable/results
21+
import pkg/stew/byteutils
22+
23+
import ../node
24+
import ../logutils
25+
import ../utils/iter
26+
import ../utils/safeasynciter
27+
import ../utils/trackedfutures
28+
import ../errors
29+
import ../manifest
30+
import ../blocktype
31+
import ../stores/blockstore
32+
33+
import ./tarballs
34+
import ./directorymanifest
35+
import ./decoding
36+
37+
logScope:
38+
topics = "codex node directorydownloader"
39+
40+
type
41+
DirectoryDownloader* = ref object
42+
node: CodexNodeRef
43+
queue*: AsyncQueue[seq[byte]]
44+
finished: bool
45+
trackedFutures: TrackedFutures
46+
47+
proc printQueue(self: DirectoryDownloader) =
48+
echo "Queue: ", self.queue.len, " entries"
49+
for i in 0 ..< self.queue.len:
50+
echo "Entry ", i, ": ", self.queue[i].len, " bytes"
51+
52+
proc createEntryHeader(self: DirectoryDownloader, entry: TarballEntry, basePath: string): string =
53+
echo "Creating entry header for ", entry.name
54+
echo "basePath = ", basePath
55+
echo "entry = ", entry
56+
result = newStringOfCap(512)
57+
result.add(entry.name)
58+
result.setLen(100)
59+
# ToDo: read permissions from the TarballEntry
60+
if entry.kind == ekDirectory:
61+
result.add("000755 \0") # Dir mode
62+
else:
63+
result.add("000644 \0") # File mode
64+
result.add(toOct(0, 6) & " \0") # Owner's numeric user ID
65+
result.add(toOct(0, 6) & " \0") # Group's numeric user ID
66+
result.add(toOct(entry.contentLength, 11) & ' ') # File size
67+
result.add(toOct(entry.lastModified.toUnix(), 11) & ' ') # Last modified time
68+
result.add(" ") # Empty checksum for now
69+
result.setLen(156)
70+
result.add(ord(entry.kind).char)
71+
result.setLen(257)
72+
result.add("ustar\0") # UStar indicator
73+
result.add(toOct(0, 2)) # UStar version
74+
result.setLen(329)
75+
result.add(toOct(0, 6) & "\0 ") # Device major number
76+
result.add(toOct(0, 6) & "\0 ") # Device minor number
77+
result.add(basePath)
78+
result.setLen(512)
79+
80+
var checksum: int
81+
for i in 0 ..< result.len:
82+
checksum += result[i].int
83+
84+
let checksumStr = toOct(checksum, 6) & '\0'
85+
for i in 0 ..< checksumStr.len:
86+
result[148 + i] = checksumStr[i]
87+
88+
proc fetchTarball(self: DirectoryDownloader,
89+
cid: Cid, basePath = "",
90+
): Future[?!void] {.async: (raises: [CancelledError]).} =
91+
echo "fetchTarball: ", cid, " basePath = ", basePath
92+
# we got a Cid - let's check if this is a manifest (can be either
93+
# a directory or file manifest)
94+
without isM =? cid.isManifest, err:
95+
warn "Unable to determine if cid is a manifest"
96+
return failure("Unable to determine if cid is a manifest")
97+
98+
if not isM:
99+
# this is not a manifest, so we can return
100+
return failure("given cid is not a manifest: " & $cid)
101+
102+
# get the manifest
103+
without blk =? await self.node.blockStore.getBlock(cid), err:
104+
error "Error retrieving manifest block", cid, err = err.msg
105+
return failure("Error retrieving manifest block (cid = " & $cid & "), err = " & err.msg)
106+
107+
without manifest =? Manifest.decode(blk), err:
108+
info "Unable to decode as manifest - trying to decode as directory manifest", err = err.msg
109+
# Try if it not a directory manifest
110+
without manifest =? DirectoryManifest.decode(blk), err:
111+
error "Unable to decode as directory manifest", err = err.msg
112+
return failure("Unable to decode as valid manifest (cid = " & $cid & ")")
113+
# this is a directory manifest
114+
echo "Decoded directory manifest: ", $manifest
115+
let dirEntry = TarballEntry(
116+
kind: ekDirectory,
117+
name: manifest.name,
118+
lastModified: getTime(), # ToDo: store actual time in the manifest
119+
permissions: parseFilePermissions(cast[uint32](0o755)), # same here
120+
contentLength: 0
121+
)
122+
let header = self.createEntryHeader(dirEntry, basePath)
123+
echo "header = ", header
124+
await self.queue.addLast(header.toBytes())
125+
self.printQueue()
126+
var entryLength = header.len
127+
let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned
128+
if alignedEntryLength - entryLength > 0:
129+
echo "Adding ", alignedEntryLength - entryLength, " bytes of padding"
130+
var data = newSeq[byte]()
131+
data.setLen(alignedEntryLength - entryLength)
132+
await self.queue.addLast(data)
133+
self.printQueue()
134+
135+
for cid in manifest.cids:
136+
echo "fetching directory content: ", cid
137+
if err =? (await self.fetchTarball(cid, basePath / manifest.name)).errorOption:
138+
error "Error fetching directory content", cid, path = basePath / manifest.name, err = err.msg
139+
return failure("Error fetching directory content (cid = " & $cid & "), err = " & err.msg)
140+
echo "fetchTarball[DIR]: ", cid, " basePath = ", basePath, " done"
141+
return success()
142+
143+
# this is a regular file (Codex) manifest
144+
echo "Decoded file manifest: ", $manifest
145+
let fileEntry = TarballEntry(
146+
kind: ekNormalFile,
147+
name: manifest.filename |? "unknown",
148+
lastModified: getTime(), # ToDo: store actual time in the manifest
149+
permissions: parseFilePermissions(cast[uint32](0o644)), # same here
150+
contentLength: manifest.datasetSize.int,
151+
)
152+
let header = self.createEntryHeader(fileEntry, basePath)
153+
await self.queue.addLast(header.toBytes())
154+
self.printQueue()
155+
var contentLength = 0
156+
157+
proc onBatch(blocks: seq[Block]): Future[?!void] {.async: (raises: [CancelledError]).} =
158+
echo "onBatch: ", blocks.len, " blocks"
159+
for blk in blocks:
160+
echo "onBatch[blk.data]: ", string.fromBytes(blk.data)
161+
# await self.queue.addLast(string.fromBytes(blk.data))
162+
await self.queue.addLast(blk.data)
163+
self.printQueue()
164+
contentLength += blk.data.len
165+
# this can happen if the content was stored with padding
166+
if contentLength > manifest.datasetSize.int:
167+
contentLength = manifest.datasetSize.int
168+
echo "onBatch[contentLength]: ", contentLength
169+
success()
170+
171+
await self.node.fetchDatasetAsync(manifest, fetchLocal = true, onBatch = onBatch)
172+
173+
echo "contentLength: ", contentLength
174+
echo "manifest.datasetSize.int: ", manifest.datasetSize.int
175+
if contentLength != manifest.datasetSize.int:
176+
echo "Warning: entry length mismatch, expected ", manifest.datasetSize.int, " got ", contentLength
177+
178+
let entryLength = header.len + contentLength
179+
let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned
180+
if alignedEntryLength - entryLength > 0:
181+
echo "Adding ", alignedEntryLength - entryLength, " bytes of padding"
182+
var data = newSeq[byte]()
183+
echo "alignedEntryLength: ", alignedEntryLength
184+
echo "entryLength: ", entryLength
185+
echo "alignedEntryLength - entryLength: ", alignedEntryLength - entryLength
186+
data.setLen(alignedEntryLength - entryLength)
187+
echo "data.len: ", data.len
188+
await self.queue.addLast(data)
189+
self.printQueue()
190+
echo "fetchTarball: ", cid, " basePath = ", basePath, " done"
191+
return success()
192+
193+
proc streamDirectory(self: DirectoryDownloader,
194+
cid: Cid
195+
): Future[void] {.async: (raises: []).} =
196+
try:
197+
if err =? (await self.fetchTarball(cid, basePath = "")).errorOption:
198+
error "Error fetching directory content", cid, err = err.msg
199+
return
200+
# Two consecutive zero-filled records at end
201+
var data = newSeq[byte]()
202+
data.setLen(1024)
203+
await self.queue.addLast(data)
204+
self.printQueue()
205+
# mark the end of the stream
206+
self.finished = true
207+
echo "streamDirectory: ", cid, " done"
208+
except CancelledError:
209+
info "Streaming directory cancelled:", cid
210+
211+
###########################################################################
212+
# Public API
213+
###########################################################################
214+
215+
proc start*(self: DirectoryDownloader, cid: Cid) =
216+
## Starts streaming the directory content
217+
self.trackedFutures.track(self.streamDirectory(cid))
218+
219+
proc stop*(self: DirectoryDownloader) {.async: (raises: []).} =
220+
await noCancel self.trackedFutures.cancelTracked()
221+
222+
proc getNext*(self: DirectoryDownloader): Future[seq[byte]] {.async: (raises: [CancelledError]).} =
223+
## Returns the next entry from the queue
224+
echo "getNext: ", self.queue.len, " entries in queue"
225+
if (self.queue.len == 0 and self.finished):
226+
return @[]
227+
echo "getNext[2]: ", self.queue.len, " entries in queue"
228+
let chunk = await self.queue.popFirst()
229+
echo "getNext: ", chunk.len, " bytes"
230+
return chunk
231+
232+
proc newDirectoryDownloader*(node: CodexNodeRef): DirectoryDownloader =
233+
## Creates a new DirectoryDownloader instance
234+
DirectoryDownloader(
235+
node: node,
236+
queue: newAsyncQueue[seq[byte]](),
237+
finished: false,
238+
trackedFutures: TrackedFutures(),
239+
)

0 commit comments

Comments
 (0)