Skip to content

Commit 5f8b8ba

Browse files
authored
Merge pull request #2 from ncthbrt/fixes/v4-compatibility
Fixes/v4 compatibility
2 parents de9efb8 + 660abd7 commit 5f8b8ba

File tree

7 files changed

+302
-150
lines changed

7 files changed

+302
-150
lines changed

lib/index.js

Lines changed: 71 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
const { AbstractPersistenceEngine, PersistedEvent } = require('nact/lib/persistence');
2-
const PgRx = require('pg-reactive');
3-
const { Promise } = require('bluebird');
1+
const { AbstractPersistenceEngine, PersistedEvent, PersistedSnapshot } = require('nact/lib/persistence');
42
require('rxjs');
3+
const pgp = require('pg-promise')();
4+
const Rx = require('rxjs');
55
const { create } = require('./schema');
66
const assert = require('assert');
77

88
class PostgresPersistenceEngine extends AbstractPersistenceEngine {
99
constructor (connectionString, { createIfNotExists = true, tablePrefix = '', ...settings } = {}) {
1010
super();
11-
this.db = new PgRx(connectionString);
11+
this.db = pgp(connectionString);
1212
this.tablePrefix = tablePrefix;
1313
if (createIfNotExists) {
14-
this.db.query(create(settings.tablePrefix)).catch(console.error).subscribe();
14+
this.db.none(create(settings.tablePrefix)).catch(console.error);
1515
}
1616
}
1717

@@ -25,6 +25,15 @@ class PostgresPersistenceEngine extends AbstractPersistenceEngine {
2525
);
2626
}
2727

28+
static mapDbModelToSnapshotDomainModel (dbSnapshot) {
29+
return new PersistedSnapshot(
30+
dbSnapshot.data,
31+
Number.parseInt(dbSnapshot.sequence_nr),
32+
dbSnapshot.persistence_key,
33+
Number.parseInt(dbSnapshot.created_at)
34+
);
35+
}
36+
2837
events (persistenceKey, offset = 0, limit = null, tags) {
2938
assert(typeof (persistenceKey) === 'string');
3039
assert(Number.isInteger(offset));
@@ -39,34 +48,73 @@ class PostgresPersistenceEngine extends AbstractPersistenceEngine {
3948
OFFSET $2
4049
LIMIT $3
4150
`;
42-
const args = [ persistenceKey, offset, limit, tags ].filter(x => x !== undefined);
4351

44-
return this.db.query(query, args)
45-
.retry(5)
46-
.map(PostgresPersistenceEngine.mapDbModelToDomainModel);
52+
const args = [persistenceKey, offset, limit, tags].filter(x => x !== undefined);
53+
54+
return Rx.Observable
55+
.of([1])
56+
// Perform query
57+
.mergeMap((_) => this.db.any(query, args))
58+
// Retry the query if it fails
59+
.retry(5)
60+
// Flatten array so that it is returned as a stream of events
61+
.mergeMap(x => x)
62+
.map(PostgresPersistenceEngine.mapDbModelToDomainModel);
4763
}
4864

4965
persist (persistedEvent) {
50-
return new Promise((resolve, reject) => {
51-
const query = ` INSERT INTO ${this.tablePrefix}event_journal (
66+
const query = `
67+
INSERT INTO ${this.tablePrefix}event_journal (
68+
persistence_key,
69+
sequence_nr,
70+
created_at,
71+
data,
72+
tags
73+
) VALUES ($1, $2, $3, $4, $5)
74+
RETURNING ordering;
75+
`;
76+
return this.db.one(
77+
query, [
78+
persistedEvent.key,
79+
persistedEvent.sequenceNumber,
80+
persistedEvent.createdAt,
81+
persistedEvent.data,
82+
persistedEvent.tags
83+
]
84+
);
85+
}
86+
87+
latestSnapshot (persistenceKey) {
88+
assert(typeof (persistenceKey) === 'string');
89+
90+
const query = ` SELECT * from ${this.tablePrefix}snapshot_store
91+
WHERE persistence_key = $1
92+
AND is_deleted = false
93+
ORDER BY sequence_nr DESC
94+
LIMIT 1
95+
`;
96+
97+
return this.db.oneOrNone(query, [persistenceKey]).then(PostgresPersistenceEngine.mapDbModelToSnapshotDomainModel);
98+
}
99+
100+
takeSnapshot (persistedSnapshot) {
101+
const query = ` INSERT INTO ${this.tablePrefix}snapshot_store (
52102
persistence_key,
53103
sequence_nr,
54104
created_at,
55-
data,
56-
tags
105+
data
57106
)
58-
VALUES ($1, $2, $3, $4, $5)
107+
VALUES ($1, $2, $3, $4)
59108
RETURNING ordering;
60109
`;
61-
this.db.query(
62-
query,
63-
[ persistedEvent.key,
64-
persistedEvent.sequenceNumber,
65-
persistedEvent.createdAt,
66-
persistedEvent.data,
67-
persistedEvent.tags
68-
]).catch(e => { reject(e); return e; }).subscribe(resolve);
69-
});
110+
return this.db.one(
111+
query, [
112+
persistedSnapshot.key,
113+
persistedSnapshot.sequenceNumber,
114+
persistedSnapshot.createdAt,
115+
persistedSnapshot.data
116+
]
117+
);
70118
}
71119
}
72120

lib/schema/index.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ module.exports.create = (tablePrefix = '') => `
99
tags TEXT ARRAY DEFAULT ARRAY[]::TEXT[],
1010
CONSTRAINT event_journal_uq UNIQUE (persistence_key, sequence_nr)
1111
);
12+
CREATE TABLE IF NOT EXISTS ${tablePrefix}snapshot_store (
13+
ordering BIGSERIAL NOT NULL PRIMARY KEY,
14+
persistence_key VARCHAR(255) NOT NULL,
15+
sequence_nr BIGINT NOT NULL,
16+
created_at BIGINT NOT NULL,
17+
data JSONB NOT NULL,
18+
is_deleted BOOLEAN NOT NULL DEFAULT FALSE
19+
);
1220
`;
1321

14-
module.exports.destroy = (tablePrefix = '') => `DROP TABLE IF EXISTS ${tablePrefix}event_journal CASCADE;`;
22+
module.exports.destroy = (tablePrefix = '') => `DROP TABLE IF EXISTS ${tablePrefix}event_journal CASCADE;
23+
DROP TABLE IF EXISTS ${tablePrefix}snapshot_store CASCADE;`;

package.json

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "nact-persistence-postgres",
3-
"version": "3.1.5",
3+
"version": "4.0.0",
44
"description": "nact-persistence-postgres ⇒ nact + postresql = your services have never been so persistent",
55
"main": "lib/index.js",
66
"scripts": {
@@ -16,24 +16,24 @@
1616
},
1717
"keywords": [
1818
"PostgreSQL",
19-
"persistence",
20-
"event sourcing",
19+
"persistence",
20+
"event sourcing",
2121
"actors",
22-
"actor-model",
22+
"actor-model",
2323
"akka",
2424
"akka.net",
2525
"non-blocking",
2626
"actor system",
27-
"erlang"
27+
"erlang"
2828
],
2929
"author": "Nick Cuthbert (https://github.com/ncthbrt)",
3030
"license": "Apache-2.0",
3131
"bugs": {
3232
"url": "http://github.com/ncthbrt/nact-persistence-postgres/issues"
3333
},
3434
"dependencies": {
35-
"nact": "^3.1.4",
36-
"pg-reactive": "^0.3.2",
35+
"nact": "^4.0.0",
36+
"pg-promise": "^7.3.2",
3737
"rxjs": "^5.4.3"
3838
},
3939
"devDependencies": {
@@ -44,7 +44,6 @@
4444
"husky": "^0.14.3",
4545
"mocha": "^4.0.0",
4646
"nyc": "^11.2.1",
47-
"pg-promise": "^7.0.0",
4847
"semistandard": "^11.0.0"
4948
}
5049
}

package.json.orig

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
{
2+
"name": "nact-persistence-postgres",
3+
<<<<<<< HEAD
4+
"version": "4.0.0",
5+
=======
6+
"version": "3.1.5",
7+
>>>>>>> origin/master
8+
"description": "nact-persistence-postgres ⇒ nact + postresql = your services have never been so persistent",
9+
"main": "lib/index.js",
10+
"scripts": {
11+
"test": "yarn run lint && nyc mocha ./test/*.js",
12+
"interactive-cover": "nyc --reporter=html mocha ./test/*.js && open ./coverage/index.html",
13+
"lint": "semistandard",
14+
"coverage": "nyc report --reporter=text-lcov | coveralls",
15+
"prepush": "yarn run test"
16+
},
17+
"repository": {
18+
"type": "git",
19+
"url": "git://github.com/ncthbrt/nact-persistence-postgres"
20+
},
21+
"keywords": [
22+
"PostgreSQL",
23+
"persistence",
24+
"event sourcing",
25+
"actors",
26+
"actor-model",
27+
"akka",
28+
"akka.net",
29+
"non-blocking",
30+
"actor system",
31+
"erlang"
32+
],
33+
"author": "Nick Cuthbert (https://github.com/ncthbrt)",
34+
"license": "Apache-2.0",
35+
"bugs": {
36+
"url": "http://github.com/ncthbrt/nact-persistence-postgres/issues"
37+
},
38+
"dependencies": {
39+
"nact": "^4.0.0",
40+
"pg-promise": "^7.3.2",
41+
"rxjs": "^5.4.3"
42+
},
43+
"devDependencies": {
44+
"chai": "^4.1.1",
45+
"chai-as-promised": "^7.1.1",
46+
"coveralls": "^3.0.0",
47+
"eslint": "^4.8.0",
48+
"husky": "^0.14.3",
49+
"mocha": "^4.0.0",
50+
"nyc": "^11.2.1",
51+
"semistandard": "^11.0.0"
52+
}
53+
}

scripts/teardown-db.sh

100644100755
File mode changed.

test/postgres-persistence.js

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ chai.should();
77
const { Promise } = require('bluebird');
88
const delay = Promise.delay;
99
const { PostgresPersistenceEngine } = require('../lib');
10-
const { PersistedEvent } = require('nact/lib/persistence');
10+
const { PersistedEvent, PersistedSnapshot } = require('nact/lib/persistence');
1111
const pgp = require('pg-promise')();
1212
const { destroy } = require('../lib/schema');
1313

@@ -29,7 +29,10 @@ const connectionString = 'postgres://postgres:testpassword@localhost:5431/testdb
2929
describe('PostgresPersistenceEngine', function () {
3030
const db = pgp(connectionString);
3131

32-
afterEach(() => { db.query(destroy()); });
32+
afterEach(() => {
33+
db.query(destroy());
34+
});
35+
3336
it('should not create database if createIfNotExists is set to false', async function () {
3437
new PostgresPersistenceEngine(connectionString, { createIfNotExists: false });
3538
await delay(300);
@@ -41,7 +44,9 @@ describe('PostgresPersistenceEngine', function () {
4144
});
4245

4346
describe('#persist', function () {
44-
afterEach(() => { db.query(destroy()); });
47+
afterEach(() => {
48+
db.query(destroy());
49+
});
4550

4651
it('should store values in database', async function () {
4752
const engine = new PostgresPersistenceEngine(connectionString);
@@ -64,6 +69,56 @@ describe('PostgresPersistenceEngine', function () {
6469
});
6570
});
6671

72+
describe('#takeSnapshot', function () {
73+
afterEach(() => {
74+
db.query(destroy());
75+
});
76+
77+
it('should store values in database', async function () {
78+
const engine = new PostgresPersistenceEngine(connectionString);
79+
await retry(async () => {
80+
const snapshot1 = new PersistedSnapshot({ message: 'hello' }, 1, 'test');
81+
const snapshot2 = new PersistedSnapshot({ message: 'goodbye' }, 2, 'test');
82+
const snapshot3 = new PersistedSnapshot({ message: 'hello' }, 1, 'test2');
83+
await engine.takeSnapshot(snapshot1);
84+
await engine.takeSnapshot(snapshot2);
85+
await engine.takeSnapshot(snapshot3);
86+
87+
const result =
88+
(await db.many('SELECT * FROM snapshot_store WHERE persistence_key = \'test\' ORDER BY sequence_nr'))
89+
.map(PostgresPersistenceEngine.mapDbModelToSnapshotDomainModel);
90+
91+
result.should.be.lengthOf(2).and.deep.equal([snapshot1, snapshot2]);
92+
const result2 = await db.one('SELECT * FROM snapshot_store WHERE persistence_key = \'test2\'');
93+
PostgresPersistenceEngine.mapDbModelToSnapshotDomainModel(result2).should.deep.equal(snapshot3);
94+
}, 7, 50);
95+
});
96+
});
97+
98+
describe('#latestSnapshot', function () {
99+
const snapshot1 = new PersistedSnapshot({ message: 'hello' }, 1, 'test3');
100+
const snapshot2 = new PersistedSnapshot({ message: 'goodbye' }, 2, 'test3');
101+
const snapshot3 = new PersistedSnapshot({ message: 'hello again' }, 3, 'test3');
102+
let engine;
103+
104+
beforeEach(async () => {
105+
engine = new PostgresPersistenceEngine(connectionString);
106+
await retry(async () => {
107+
await engine.takeSnapshot(snapshot1);
108+
await engine.takeSnapshot(snapshot2);
109+
await engine.takeSnapshot(snapshot3);
110+
}, 7, 50);
111+
});
112+
afterEach(() => {
113+
db.query(destroy());
114+
});
115+
116+
it('should be able to retrieve latest snapshot', async function () {
117+
const result = await engine.latestSnapshot('test3');
118+
result.should.deep.equal(snapshot3);
119+
});
120+
});
121+
67122
describe('#events', async function () {
68123
const event1 = new PersistedEvent({ message: 'hello' }, 1, 'test3', ['a', 'b', 'c']);
69124
const event2 = new PersistedEvent({ message: 'goodbye' }, 2, 'test3', ['a']);
@@ -78,7 +133,9 @@ describe('PostgresPersistenceEngine', function () {
78133
await engine.persist(event3);
79134
}, 7, 50);
80135
});
81-
afterEach(() => { db.query(destroy()); });
136+
afterEach(() => {
137+
db.query(destroy());
138+
});
82139

83140
it('should be able to retrieve previously persisted events', async function () {
84141
const result = await new Promise((resolve, reject) => {

0 commit comments

Comments
 (0)