22
33const { Key, Adapter } = require ( 'interface-datastore' )
44const { encodeBase32, keyToTopic, topicToKey } = require ( './utils' )
5+ const uint8ArrayEquals = require ( 'uint8arrays/equals' )
56
67const errcode = require ( 'err-code' )
78const debug = require ( 'debug' )
8- const log = debug ( 'datastore-pubsub:publisher' )
9- log . error = debug ( 'datastore-pubsub:publisher:error' )
9+ const log = Object . assign ( debug ( 'datastore-pubsub:publisher' ) , {
10+ error : debug ( 'datastore-pubsub:publisher:error' )
11+ } )
12+
13+ /**
14+ * @typedef {import('peer-id') } PeerId
15+ * @typedef {import('./types').Validator } Validator
16+ * @typedef {import('./types').SubscriptionKeyFn } SubscriptionKeyFn
17+ * @typedef {import('libp2p-interfaces/src/pubsub/message').Message } PubSubMessage
18+ */
1019
1120// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
1221// [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js }
1322class DatastorePubsub extends Adapter {
1423 /**
1524 * Creates an instance of DatastorePubsub.
1625 *
17- * @param {* } pubsub - pubsub implementation.
18- * @param {* } datastore - datastore instance.
19- * @param {* } peerId - peer-id instance.
20- * @param {Object } validator - validator functions.
21- * @param {(record: uint8Array, peerId: PeerId) => boolean } validator.validate - function to validate a record.
22- * @param {(received: uint8Array, current: uint8Array) => boolean } validator.select - function to select the newest between two records.
23- * @param {function(key, callback) } subscriptionKeyFn - optional function to manipulate the key topic received before processing it.
26+ * @param {import('libp2p-interfaces/src/pubsub') } pubsub - pubsub implementation
27+ * @param {import('interface-datastore').Datastore } datastore - datastore instance
28+ * @param {PeerId } peerId - peer-id instance
29+ * @param {Validator } validator - validator functions
30+ * @param {SubscriptionKeyFn } [subscriptionKeyFn] - function to manipulate the key topic received before processing it
2431 * @memberof DatastorePubsub
2532 */
2633 constructor ( pubsub , datastore , peerId , validator , subscriptionKeyFn ) {
@@ -57,9 +64,9 @@ class DatastorePubsub extends Adapter {
5764 *
5865 * @param {Uint8Array } key - identifier of the value to be published.
5966 * @param {Uint8Array } val - value to be propagated.
60- * @returns {Promise }
6167 */
62- async put ( key , val ) { // eslint-disable-line require-await
68+ // @ts -ignore Datastores take keys as Keys, this one takes Uint8Arrays
69+ async put ( key , val ) {
6370 if ( ! ( key instanceof Uint8Array ) ) {
6471 const errMsg = 'datastore key does not have a valid format'
6572
@@ -79,15 +86,15 @@ class DatastorePubsub extends Adapter {
7986 log ( `publish value for topic ${ stringifiedTopic } ` )
8087
8188 // Publish record to pubsub
82- return this . _pubsub . publish ( stringifiedTopic , val )
89+ await this . _pubsub . publish ( stringifiedTopic , val )
8390 }
8491
8592 /**
8693 * Try to subscribe a topic with Pubsub and returns the local value if available.
8794 *
8895 * @param {Uint8Array } key - identifier of the value to be subscribed.
89- * @returns {Promise<Uint8Array> }
9096 */
97+ // @ts -ignore Datastores take keys as Keys, this one takes Uint8Arrays
9198 async get ( key ) {
9299 if ( ! ( key instanceof Uint8Array ) ) {
93100 const errMsg = 'datastore key does not have a valid format'
@@ -106,7 +113,8 @@ class DatastorePubsub extends Adapter {
106113
107114 // subscribe
108115 try {
109- await this . _pubsub . subscribe ( stringifiedTopic , this . _onMessage )
116+ this . _pubsub . on ( stringifiedTopic , this . _onMessage )
117+ await this . _pubsub . subscribe ( stringifiedTopic )
110118 } catch ( err ) {
111119 const errMsg = `cannot subscribe topic ${ stringifiedTopic } `
112120
@@ -127,10 +135,16 @@ class DatastorePubsub extends Adapter {
127135 unsubscribe ( key ) {
128136 const stringifiedTopic = keyToTopic ( key )
129137
130- return this . _pubsub . unsubscribe ( stringifiedTopic , this . _onMessage )
138+ this . _pubsub . removeListener ( stringifiedTopic , this . _onMessage )
139+ return this . _pubsub . unsubscribe ( stringifiedTopic )
131140 }
132141
133- // Get record from local datastore
142+ /**
143+ * Get record from local datastore
144+ *
145+ * @private
146+ * @param {Uint8Array } key
147+ */
134148 async _getLocal ( key ) {
135149 // encode key - base32(/ipns/{cid})
136150 const routingKey = new Key ( '/' + encodeBase32 ( key ) , false )
@@ -161,7 +175,11 @@ class DatastorePubsub extends Adapter {
161175 return dsVal
162176 }
163177
164- // handles pubsub subscription messages
178+ /**
179+ * handles pubsub subscription messages
180+ *
181+ * @param {PubSubMessage } msg
182+ */
165183 async _onMessage ( msg ) {
166184 const { data, from, topicIDs } = msg
167185 let key
@@ -200,7 +218,12 @@ class DatastorePubsub extends Adapter {
200218 }
201219 }
202220
203- // Store the received record if it is better than the current stored
221+ /**
222+ * Store the received record if it is better than the current stored
223+ *
224+ * @param {Uint8Array } key
225+ * @param {Uint8Array } data
226+ */
204227 async _storeIfSubscriptionIsBetter ( key , data ) {
205228 let isBetter = false
206229
@@ -217,20 +240,35 @@ class DatastorePubsub extends Adapter {
217240 }
218241 }
219242
220- // Validate record according to the received validation function
243+ /**
244+ * Validate record according to the received validation function
245+ *
246+ * @param {Uint8Array } value
247+ * @param {Uint8Array } peerId
248+ */
221249 async _validateRecord ( value , peerId ) { // eslint-disable-line require-await
222250 return this . _validator . validate ( value , peerId )
223251 }
224252
225- // Select the best record according to the received select function.
253+ /**
254+ * Select the best record according to the received select function
255+ *
256+ * @param {Uint8Array } receivedRecord
257+ * @param {Uint8Array } currentRecord
258+ */
226259 async _selectRecord ( receivedRecord , currentRecord ) {
227260 const res = await this . _validator . select ( receivedRecord , currentRecord )
228261
229262 // If the selected was the first (0), it should be stored (true)
230263 return res === 0
231264 }
232265
233- // Verify if the record received through pubsub is valid and better than the one currently stored
266+ /**
267+ * Verify if the record received through pubsub is valid and better than the one currently stored
268+ *
269+ * @param {Uint8Array } key
270+ * @param {Uint8Array } val
271+ */
234272 async _isBetter ( key , val ) {
235273 // validate received record
236274 let error , valid
@@ -261,64 +299,27 @@ class DatastorePubsub extends Adapter {
261299 }
262300
263301 // if the same record, do not need to store
264- if ( currentRecord . equals ( val ) ) {
302+ if ( uint8ArrayEquals ( currentRecord , val ) ) {
265303 return false
266304 }
267305
268306 // verify if the received record should replace the current one
269307 return this . _selectRecord ( val , currentRecord )
270308 }
271309
272- // add record to datastore
310+ /**
311+ * add record to datastore
312+ *
313+ * @param {Uint8Array } key
314+ * @param {Uint8Array } data
315+ */
273316 async _storeRecord ( key , data ) {
274317 // encode key - base32(/ipns/{cid})
275318 const routingKey = new Key ( '/' + encodeBase32 ( key ) , false )
276319
277320 await this . _datastore . put ( routingKey , data )
278321 log ( `record for ${ keyToTopic ( key ) } was stored in the datastore` )
279322 }
280-
281- open ( ) {
282- const errMsg = 'open function was not implemented yet'
283-
284- log . error ( errMsg )
285- throw errcode ( new Error ( errMsg ) , 'ERR_NOT_IMPLEMENTED_YET' )
286- }
287-
288- has ( key ) {
289- const errMsg = 'has function was not implemented yet'
290-
291- log . error ( errMsg )
292- throw errcode ( new Error ( errMsg ) , 'ERR_NOT_IMPLEMENTED_YET' )
293- }
294-
295- delete ( key ) {
296- const errMsg = 'delete function was not implemented yet'
297-
298- log . error ( errMsg )
299- throw errcode ( new Error ( errMsg ) , 'ERR_NOT_IMPLEMENTED_YET' )
300- }
301-
302- close ( ) {
303- const errMsg = 'close function was not implemented yet'
304-
305- log . error ( errMsg )
306- throw errcode ( new Error ( errMsg ) , 'ERR_NOT_IMPLEMENTED_YET' )
307- }
308-
309- batch ( ) {
310- const errMsg = 'batch function was not implemented yet'
311-
312- log . error ( errMsg )
313- throw errcode ( new Error ( errMsg ) , 'ERR_NOT_IMPLEMENTED_YET' )
314- }
315-
316- query ( ) {
317- const errMsg = 'query function was not implemented yet'
318-
319- log . error ( errMsg )
320- throw errcode ( new Error ( errMsg ) , 'ERR_NOT_IMPLEMENTED_YET' )
321- }
322323}
323324
324325exports = module . exports = DatastorePubsub
0 commit comments