Skip to content

Commit 25eca46

Browse files
authored
feat: add depth and breadth-first graph walkers to utils package (#871)
Implement dag walking in a way that will not overflow the stack when walking over arbitrarily large DAGs.
1 parent 6552c4b commit 25eca46

File tree

3 files changed

+324
-0
lines changed

3 files changed

+324
-0
lines changed

packages/utils/src/graph-walker.ts

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import { Queue } from '@libp2p/utils'
2+
import toBuffer from 'it-to-buffer'
3+
import { createUnsafe } from 'multiformats/block'
4+
import type { CodecLoader } from '@helia/interface'
5+
import type { Blockstore } from 'interface-blockstore'
6+
import type { AbortOptions } from 'interface-store'
7+
import type { BlockView, CID, Version } from 'multiformats'
8+
9+
export interface GraphWalkerComponents {
10+
blockstore: Blockstore
11+
getCodec: CodecLoader
12+
}
13+
14+
export interface GraphWalkerInit {
15+
16+
}
17+
18+
export interface GraphNode <T = unknown, C extends number = number, A extends number = number, V extends Version = 0 | 1> {
19+
block: BlockView<T, C, A, V>
20+
depth: number
21+
path: CID[]
22+
}
23+
24+
export interface GraphWalker {
25+
walk <T = any> (cid: CID, options?: AbortOptions): AsyncGenerator<GraphNode<T>>
26+
}
27+
28+
export function depthFirstWalker (components: GraphWalkerComponents, init: GraphWalkerInit = {}): GraphWalker {
29+
return new DepthFirstGraphWalker(components, init)
30+
}
31+
32+
export function breadthFirstWalker (components: GraphWalkerComponents, init: GraphWalkerInit = {}): GraphWalker {
33+
return new BreadthFirstGraphWalker(components, init)
34+
}
35+
36+
interface JobOptions extends AbortOptions {
37+
cid: CID
38+
depth: number
39+
path: CID[]
40+
}
41+
42+
class DepthFirstGraphWalker {
43+
private readonly components: GraphWalkerComponents
44+
45+
constructor (components: GraphWalkerComponents, init: GraphWalkerInit = {}) {
46+
this.components = components
47+
}
48+
49+
async * walk <T = any> (cid: CID, options: AbortOptions): AsyncGenerator<GraphNode<T>> {
50+
const queue = new Queue<GraphNode<T>, JobOptions>({
51+
concurrency: 1,
52+
sort: (a, b) => {
53+
if (a.options.depth === b.options.depth) {
54+
return 0
55+
}
56+
57+
if (a.options.depth < b.options.depth) {
58+
return 1
59+
}
60+
61+
return -1
62+
}
63+
})
64+
65+
const gen = queue.toGenerator()
66+
67+
const job = async (options: JobOptions): Promise<GraphNode<T>> => {
68+
const cid = options.cid
69+
const bytes = await toBuffer(this.components.blockstore.get(cid, options))
70+
const block = createUnsafe({
71+
cid,
72+
bytes,
73+
codec: await this.components.getCodec(cid.code)
74+
})
75+
76+
for (const [, linkedCid] of block.links()) {
77+
queue.add(job, {
78+
...options,
79+
cid: linkedCid,
80+
depth: options.depth + 1,
81+
path: [...options.path, linkedCid]
82+
})
83+
.catch(err => {
84+
gen.throw(err)
85+
queue.abort()
86+
})
87+
}
88+
89+
return {
90+
block,
91+
depth: options.depth,
92+
path: options.path
93+
}
94+
}
95+
96+
queue.add(job, {
97+
...options,
98+
cid,
99+
depth: 0,
100+
path: [cid]
101+
})
102+
.catch(err => {
103+
gen.throw(err)
104+
queue.abort()
105+
})
106+
107+
yield * gen
108+
}
109+
}
110+
111+
class BreadthFirstGraphWalker {
112+
private readonly components: GraphWalkerComponents
113+
114+
constructor (components: GraphWalkerComponents, init: GraphWalkerInit = {}) {
115+
this.components = components
116+
}
117+
118+
async * walk <T = any> (cid: CID, options: AbortOptions): AsyncGenerator<GraphNode<T>> {
119+
const queue = new Queue<GraphNode<T>, JobOptions>({
120+
concurrency: 1,
121+
sort: (a, b) => {
122+
if (a.options.depth === b.options.depth) {
123+
return 0
124+
}
125+
126+
if (a.options.depth < b.options.depth) {
127+
return -1
128+
}
129+
130+
return 1
131+
}
132+
})
133+
134+
const gen = queue.toGenerator()
135+
136+
const job = async (options: JobOptions): Promise<GraphNode<T>> => {
137+
const cid = options.cid
138+
const bytes = await toBuffer(this.components.blockstore.get(cid, options))
139+
const block = createUnsafe({
140+
cid,
141+
bytes,
142+
codec: await this.components.getCodec(cid.code)
143+
})
144+
145+
for (const [, linkedCid] of block.links()) {
146+
queue.add(job, {
147+
...options,
148+
cid: linkedCid,
149+
depth: options.depth + 1,
150+
path: [...options.path, linkedCid]
151+
})
152+
.catch(err => {
153+
gen.throw(err)
154+
queue.abort()
155+
})
156+
}
157+
158+
return {
159+
block,
160+
depth: options.depth,
161+
path: options.path
162+
}
163+
}
164+
165+
queue.add(job, {
166+
...options,
167+
cid,
168+
depth: 0,
169+
path: [cid]
170+
})
171+
.catch(err => {
172+
gen.throw(err)
173+
queue.abort()
174+
})
175+
176+
yield * gen
177+
}
178+
}

packages/utils/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ export type { AbstractCreateSessionOptions, BlockstoreSessionEvents, AbstractSes
3636

3737
export type { BlockStorage, BlockStorageInit }
3838

39+
export { breadthFirstWalker, depthFirstWalker } from './graph-walker.ts'
40+
export type { GraphWalkerComponents, GraphWalkerInit, GraphNode, GraphWalker } from './graph-walker.ts'
41+
3942
/**
4043
* Options used to create a Helia node.
4144
*/
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import * as dagCbor from '@ipld/dag-cbor'
2+
import { expect } from 'aegir/chai'
3+
import { MemoryBlockstore } from 'blockstore-core'
4+
import all from 'it-all'
5+
import map from 'it-map'
6+
import { CID } from 'multiformats/cid'
7+
import { sha256 } from 'multiformats/hashes/sha2'
8+
import { breadthFirstWalker, depthFirstWalker } from '../src/graph-walker.ts'
9+
import type { CodecLoader } from '@helia/interface'
10+
import type { Blockstore } from 'interface-blockstore'
11+
12+
interface Node {
13+
name: string
14+
children?: CID[]
15+
}
16+
17+
interface Child {
18+
cid: CID,
19+
buf: Uint8Array
20+
obj: any
21+
}
22+
23+
async function createNode (obj: any, blockstore: Blockstore): Promise<Child> {
24+
const buf = dagCbor.encode(obj)
25+
const hash = await sha256.digest(buf)
26+
const cid = CID.createV1(dagCbor.code, hash)
27+
28+
await blockstore.put(cid, buf)
29+
30+
return {
31+
cid,
32+
buf,
33+
obj
34+
}
35+
}
36+
37+
describe('graph-walker', () => {
38+
let blockstore: Blockstore
39+
let nodes: Record<string, Child>
40+
let getCodec: CodecLoader
41+
42+
beforeEach(async () => {
43+
blockstore = new MemoryBlockstore()
44+
getCodec = (): any => dagCbor
45+
46+
// create a graph:
47+
// root
48+
// -> a
49+
// -> d
50+
// -> e
51+
// -> f
52+
// -> b
53+
// -> g
54+
// -> h
55+
// -> i
56+
// -> c
57+
// -> j
58+
// -> k
59+
// -> l
60+
61+
nodes = {}
62+
63+
for (const name of ['d', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l']) {
64+
nodes[name] = await createNode({
65+
name
66+
}, blockstore)
67+
}
68+
69+
nodes.a = await createNode({
70+
name: 'a',
71+
children: [
72+
nodes.d.cid,
73+
nodes.e.cid,
74+
nodes.f.cid
75+
]
76+
}, blockstore)
77+
78+
nodes.b = await createNode({
79+
name: 'b',
80+
children: [
81+
nodes.g.cid,
82+
nodes.h.cid,
83+
nodes.i.cid
84+
]
85+
}, blockstore)
86+
87+
nodes.c = await createNode({
88+
name: 'c',
89+
children: [
90+
nodes.j.cid,
91+
nodes.k.cid,
92+
nodes.l.cid
93+
]
94+
}, blockstore)
95+
96+
nodes.root = await createNode({
97+
name: 'root',
98+
children: [
99+
nodes.a.cid,
100+
nodes.b.cid,
101+
nodes.c.cid
102+
]
103+
}, blockstore)
104+
})
105+
106+
describe('depth-first', () => {
107+
it('should walk depth-first', async () => {
108+
const walker = depthFirstWalker({
109+
blockstore,
110+
getCodec
111+
})
112+
113+
const result = await all(map(walker.walk(nodes.root.cid), (node) => {
114+
const obj = dagCbor.decode<Node>(node.block.bytes)
115+
116+
return obj.name
117+
}))
118+
119+
expect(result).to.deep.equal([
120+
'root', 'a', 'd', 'e', 'f', 'b', 'g', 'h', 'i', 'c', 'j', 'k', 'l'
121+
])
122+
})
123+
})
124+
125+
describe('breadth-first', () => {
126+
it('should walk breadth-first', async () => {
127+
const walker = breadthFirstWalker({
128+
blockstore,
129+
getCodec
130+
})
131+
132+
const result = await all(map(walker.walk(nodes.root.cid), (node) => {
133+
const obj = dagCbor.decode<Node>(node.block.bytes)
134+
135+
return obj.name
136+
}))
137+
138+
expect(result).to.deep.equal([
139+
'root', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l'
140+
])
141+
})
142+
})
143+
})

0 commit comments

Comments
 (0)