Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 8 additions & 114 deletions bin/commands/export.js
Original file line number Diff line number Diff line change
@@ -1,113 +1,13 @@
const fs = require('fs').promises
const p = require('path')
const { EventEmitter } = require('events')

const cliProgress = require('cli-progress')
const mirrorFolder = require('mirror-folder')
const low = require('last-one-wins')
const streamx = require('streamx')
const pump = require('pump')
const { flags } = require('@oclif/command')

const DaemonCommand = require('../../lib/cli')
const { HyperdriveClient } = require('../..')

const KEY_FILE_PATH = '.hyperdrive-export-key'

class DriveWatcher extends EventEmitter {
constructor (client, drive, opts = {}) {
super()
this.client = client
this.drive = drive
this.recursive = !!opts.recursive
this.drivesByPath = new Map([[ '/', drive ]])
this.versionsByPath = new Map()
this.unwatchesByPath = new Map()
this.watchers = []
this.timer = null
this.emittingStats = false
}

_createDiffer (path, drive) {
// Last-one-wins in case the watch is triggered many times in quick succession.
const self = this
return low(onupdate)

async function onupdate (_, cb) {
const lastVersion = self.versionsByPath.get(path)
try {
const diffStream = await drive.createDiffStream(lastVersion)
const currentVersion = await drive.version()
self.versionsByPath.set(path, currentVersion)
return pump(diffStream, new streamx.Transform({
transform: (data, cb) => {
for (const watcher of self.watchers) {
watcher(p.join(path, data.name))
}
return cb(null)
}
}), err => {
if (err) return cb(err)
return cb(null)
})
} catch (err) {
return cb(err)
}
}
}

async _emitStats () {
if (this.emittingStats) return
this.emittingStats = true
var total = 0
var downloaded = 0
var peers = 0
for (const [path, drive] of this.drivesByPath) {
const driveStats = await drive.stats()
for (const { path, content } of driveStats.stats) {
if (path !== '/' || !content) continue
downloaded += content.downloadedBlocks
total += content.totalBlocks
peers = content.peers
}
}
this.emit('stats', { total, downloaded, peers })
this.emittingStats = false
}

async start () {
// TODO: Handle dynamic (un)mounting.
this.versionsByPath.set('/', await this.drive.version())
this.unwatchesByPath.set('/', this.drive.watch('/', this._createDiffer('/', this.drive)))
const allMounts = await this.drive.mounts({ memory: false, recursive: this.recursive })
for (const { path, opts } of allMounts) {
if (path === '/') continue
const childDrive = await this.client.drive.get({ key: opts.key })
this.drivesByPath.set(path, childDrive)
this.versionsByPath.set(path, opts.version)
this.unwatchesByPath.set(path, childDrive.watch('/', this._createDiffer(path, childDrive)))
}
this.timer = setInterval(this._emitStats.bind(this), 1000)
}

watch (_, onwatch) {
// The watch path is ignored for drives.
this.watchers.push(onwatch)
return () => {
this.watchers.splice(this.watchers.indexOf(onwatch), 1)
}
}

async close () {
for (const [path, unwatch] of this.unwatchesByPath) {
await unwatch()
}
if (this.timer) {
clearInterval(this.timer)
this.timer = null
}
}
}
const { exportKeyFilePath: KEY_FILE_PATH } = require('../../lib/constants')

class ExportCommand extends DaemonCommand {
static usage = 'export [key] [dir]'
Expand Down Expand Up @@ -163,29 +63,23 @@ class ExportCommand extends DaemonCommand {
const drive = await this.client.drive.get({ key })
if (!loadedKey) await saveKeyToFile()

const driveWatcher = new DriveWatcher(this.client, drive, {
recursive: flags.recursive
})
await driveWatcher.start()

const progress = new cliProgress.SingleBar({
format: `Exporting | {bar} | {percentage}% | {value}/{total} Content Blocks | {peers} Peers`
})
console.log(`Exporting ${key.toString('hex')} into ${dir} (Ctrl+c to exit)...`)
console.log()
progress.start(1, 0)
driveWatcher.on('stats', stats => {
progress.setTotal(stats.total)
progress.update(stats.downloaded, { peers: stats.peers })
})

process.on('SIGINT', cleanup)
process.on('SIGTERM', cleanup)

const remoteMirror = mirrorFolder({ fs: drive, name: '/' }, dir, {
watch: driveWatcher.watch.bind(driveWatcher),
keepExisting: true,
ensureParents: true
const driveWatcher = this.client.drive.export(drive, dir, {recursive: flags.recursive})

await driveWatcher.start()

driveWatcher.on('stats', stats => {
progress.setTotal(stats.total)
progress.update(stats.downloaded, { peers: stats.peers })
})

async function cleanup () {
Expand Down
27 changes: 7 additions & 20 deletions bin/commands/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ const fs = require('fs').promises
const p = require('path')

const cliProgress = require('cli-progress')
const mirrorFolder = require('mirror-folder')
const { flags } = require('@oclif/command')

const DaemonCommand = require('../../lib/cli')
const { HyperdriveClient } = require('../..')

const IMPORT_KEY_FILE_PATH = '.hyperdrive-import-key'
const EXPORT_KEY_FILE_PATH = '.hyperdrive-export-key'
const {
importKeyFilePath: IMPORT_KEY_FILE_PATH,
exportKeyFilePath: EXPORT_KEY_FILE_PATH
} = require('../../lib/constants')

class ImportCommand extends DaemonCommand {
static usage = 'import [dir] [key]'
Expand Down Expand Up @@ -64,16 +64,8 @@ class ImportCommand extends DaemonCommand {
console.log(`Importing ${args.dir} into ${drive.key.toString('hex')} (Ctrl+c to exit)...`)
console.log()

const localMirror = mirrorFolder(args.dir, { fs: drive, name: '/' }, {
watch: true,
dereference: true,
// When going from fs -> drive, it should overwrite.
keepExisting: false,
ignore: (file, stat, cb) => {
if (shouldIgnore(file)) return process.nextTick(cb, null, true)
return process.nextTick(cb, null, false)
}
})
const localMirror = this.client.drive.import(args.dir, drive)

localMirror.on('pending', ({ name }) => {
if (shouldIgnore(name)) return
progress.setTotal(++total)
Expand Down Expand Up @@ -116,12 +108,7 @@ class ImportCommand extends DaemonCommand {
return fs.writeFile(keyPath, drive.key)
}

function shouldIgnore (name) {
if (!name) return true
if (name.indexOf(EXPORT_KEY_FILE_PATH) !== -1) return true
else if (name.indexOf(IMPORT_KEY_FILE_PATH) !== -1) return true
return false
}

}
}

Expand Down
62 changes: 62 additions & 0 deletions lib/clients/drive.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const {resolve} = require('path')
const grpc = require('@grpc/grpc-js')
const maybe = require('call-me-maybe')
const collectStream = require('stream-collector')
const pump = require('pump')
const mirrorFolder = require('mirror-folder')
const codecs = require('codecs')
const { Writable, Transform } = require('streamx')

Expand All @@ -27,6 +29,11 @@ const {
fromDriveInfo,
toRPCMetadata: toMetadata
} = require('../common')
const DriveWatcher = require('../driveWatcher')
const {
importKeyFilePath: IMPORT_KEY_FILE_PATH,
exportKeyFilePath: EXPORT_KEY_FILE_PATH
} = require('../constants')

module.exports = class DriveClient {
constructor (endpoint, token) {
Expand Down Expand Up @@ -98,6 +105,61 @@ module.exports = class DriveClient {
})
}))
}

import (dir, drive) {
if (!dir) {
dir = process.cwd()
} else {
dir = resolve(dir)
}

if (!drive.writable) {
console.error('The target drive is not writable!')
return process.exit(1)
}

function shouldIgnore (name) {
if (!name) return true
if (name.indexOf(EXPORT_KEY_FILE_PATH) !== -1) return true
else if (name.indexOf(IMPORT_KEY_FILE_PATH) !== -1) return true
return false
}

return mirrorFolder(dir, { fs: drive, name: '/' }, {
watch: true,
dereference: true,
// When going from fs -> drive, it should overwrite.
keepExisting: false,
ignore: (file, stat, cb) => {
if (shouldIgnore(file)) return process.nextTick(cb, null, true)
return process.nextTick(cb, null, false)
}
})
}

export (drive, dir, opts = { recursive: false }) {
if (!drive) {
throw new Error('missing argument: drive')
}

if (!dir) {
throw new Error('missing argument: dir')
}

dir = resolve(dir)

const driveWatcher = new DriveWatcher(this._client, drive, {
recursive: opts.recursive
})

mirrorFolder({ fs: drive, name: '/' }, dir, {
watch: driveWatcher.watch.bind(driveWatcher),
keepExisting: true,
ensureParents: true
})

return driveWatcher
}
}

class RemoteHyperdrive {
Expand Down
3 changes: 3 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ module.exports = {
unstructuredLog: p.join(DAEMON_ROOT, 'output.log'),
structuredLog: p.join(DAEMON_ROOT, 'log.json'),

importKeyFilePath: '.hyperdrive-import-key',
exportKeyFilePath: '.hyperdrive-export-key',

env: {
endpoint: process.env['HYPERDRIVE_ENDPOINT'],
token: process.env['HYPERDRIVE_TOKEN'],
Expand Down
101 changes: 101 additions & 0 deletions lib/driveWatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
const { EventEmitter } = require('events')
const low = require('last-one-wins')
const pump = require('pump')
const streamx = require('streamx')

class DriveWatcher extends EventEmitter {
constructor (client, drive, opts = {}) {
super()
this.client = client
this.drive = drive
this.recursive = !!opts.recursive
this.drivesByPath = new Map([[ '/', drive ]])
this.versionsByPath = new Map()
this.unwatchesByPath = new Map()
this.watchers = []
this.timer = null
this.emittingStats = false
}

_createDiffer (path, drive) {
// Last-one-wins in case the watch is triggered many times in quick succession.
const self = this
return low(onupdate)

async function onupdate (_, cb) {
const lastVersion = self.versionsByPath.get(path)
try {
const diffStream = await drive.createDiffStream(lastVersion)
const currentVersion = await drive.version()
self.versionsByPath.set(path, currentVersion)
return pump(diffStream, new streamx.Transform({
transform: (data, cb) => {
for (const watcher of self.watchers) {
watcher(p.join(path, data.name))
}
return cb(null)
}
}), err => {
if (err) return cb(err)
return cb(null)
})
} catch (err) {
return cb(err)
}
}
}

async _emitStats () {
if (this.emittingStats) return
this.emittingStats = true
var total = 0
var downloaded = 0
var peers = 0
for (const [path, drive] of this.drivesByPath) {
const driveStats = await drive.stats()
for (const { path, content } of driveStats.stats) {
if (path !== '/' || !content) continue
downloaded += content.downloadedBlocks
total += content.totalBlocks
peers = content.peers
}
}
this.emit('stats', { total, downloaded, peers })
this.emittingStats = false
}

async start () {
// TODO: Handle dynamic (un)mounting.
this.versionsByPath.set('/', await this.drive.version())
this.unwatchesByPath.set('/', this.drive.watch('/', this._createDiffer('/', this.drive)))
const allMounts = await this.drive.mounts({ memory: false, recursive: this.recursive })
for (const { path, opts } of allMounts) {
if (path === '/') continue
const childDrive = await this.client.drive.get({ key: opts.key })
this.drivesByPath.set(path, childDrive)
this.versionsByPath.set(path, opts.version)
this.unwatchesByPath.set(path, childDrive.watch('/', this._createDiffer(path, childDrive)))
}
this.timer = setInterval(this._emitStats.bind(this), 1000)
}

watch (_, onwatch) {
// The watch path is ignored for drives.
this.watchers.push(onwatch)
return () => {
this.watchers.splice(this.watchers.indexOf(onwatch), 1)
}
}

async close () {
for (const [path, unwatch] of this.unwatchesByPath) {
await unwatch()
}
if (this.timer) {
clearInterval(this.timer)
this.timer = null
}
}
}

module.exports = DriveWatcher