Skip to content

Commit 20e5630

Browse files
committed
feat: 🎸 add Log.rebaseBatch() and Log.reset() methods
1 parent 4f26c28 commit 20e5630

File tree

2 files changed

+195
-58
lines changed

2 files changed

+195
-58
lines changed

src/json-crdt/log/Log.ts

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
7878
*
7979
* @readonly
8080
*/
81-
public readonly patches = new AvlMap<ITimestampStruct, Patch>(compare);
81+
public patches = new AvlMap<ITimestampStruct, Patch>(compare);
8282

8383
private __onPatch: FanOutUnsubscribe;
8484
private __onFlush: FanOutUnsubscribe;
@@ -125,7 +125,6 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
125125
public destroy() {
126126
this.__onPatch();
127127
this.__onFlush();
128-
this.patches.clear();
129128
}
130129

131130
/**
@@ -161,22 +160,6 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
161160
return clone;
162161
}
163162

164-
/**
165-
* Finds the latest patch for a given session ID.
166-
*
167-
* @param sid Session ID to find the latest patch for.
168-
* @return The latest patch for the given session ID, or `undefined` if no
169-
* such patch exists.
170-
*/
171-
public findMax(sid: number): Patch | undefined {
172-
let curr = this.patches.max;
173-
while (curr) {
174-
if (curr.k.sid === sid) return curr.v;
175-
curr = prev(curr);
176-
}
177-
return;
178-
}
179-
180163
/**
181164
* Advance the start of the log to a specified timestamp, excluding the patch
182165
* at the given timestamp. This method removes all patches from the log that
@@ -215,6 +198,89 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
215198
return;
216199
}
217200

201+
/**
202+
* @returns A deep clone of the log, including the start function, metadata,
203+
* patches, and the end model.
204+
*/
205+
public clone(): Log<N, Metadata> {
206+
const start = this.start;
207+
const metadata = cloneBinary(this.metadata) as Metadata;
208+
const end = this.end.clone();
209+
const log = new Log(start, end, metadata);
210+
for (const {v} of this.patches.entries()) {
211+
const patch = v.clone();
212+
const id = patch.getId();
213+
if (!id) continue;
214+
log.patches.set(id, patch);
215+
}
216+
return log;
217+
}
218+
219+
// /**
220+
// * Adds a batch of patches to the log, without applying them to the `end`
221+
// * model. It is assumed that the patches are already applied to the `end`
222+
// * model, this method only adds them to the internal patch collection.
223+
// *
224+
// * If you need to apply patches to the `end` model, use `end.applyBatch(batch)`,
225+
// * it will apply them to the model and add them to the log automatically.
226+
// *
227+
// * @param batch Array of patches to add to the log.
228+
// */
229+
// public add(batch: Patch[]): void {
230+
// const patches = this.patches;
231+
// for (const patch of batch) {
232+
// const id = patch.getId();
233+
// if (id) patches.set(id, patch);
234+
// }
235+
// }
236+
237+
/**
238+
* Rebase a batch of patches on top of the current end of the log, or on top
239+
* of the latest patch for a given session ID.
240+
*
241+
* @param batch A batch of patches to rebase.
242+
* @param sid Session ID to find the latest patch for rebasing. If not provided,
243+
* the latest patch in the log is used.
244+
* @returns The rebased patches.
245+
*/
246+
public rebaseBatch(batch: Patch[], sid?: number): Patch[] {
247+
const rebasePatch = sid ? this.findMax(sid) : this.patches.max?.v;
248+
if (!rebasePatch) return batch;
249+
const rebaseId = rebasePatch.getId();
250+
if (!rebaseId) return batch;
251+
let nextTime = rebaseId.time + rebasePatch.span();
252+
const rebased: Patch[] = [];
253+
const length = batch.length;
254+
for (let i = 0; i < length; i++) {
255+
const patch = batch[i].rebase(nextTime);
256+
nextTime += patch.span();
257+
rebased.push(patch);
258+
}
259+
return rebased;
260+
}
261+
262+
/**
263+
* Resets the log to the state of another log. Consumes all state fron the `to`
264+
* log. The `to` log will be destroyed and should not be used after calling
265+
* this method.
266+
*
267+
* If you want to preserve the `to` log, use `.clone()` method first.
268+
*
269+
* ```ts
270+
* const log1 = new Log();
271+
* const log2 = new Log();
272+
* log1.reset(log2.clone());
273+
* ```
274+
*
275+
* @param to The log to consume the state from.
276+
*/
277+
public reset(to: Log<N, Metadata>): void {
278+
this.start = to.start;
279+
this.metadata = to.metadata;
280+
this.patches = to.patches;
281+
this.end.reset(to.end);
282+
to.destroy();
283+
}
218284

219285
/**
220286
* Creates a patch which reverts the given patch. The RGA insertion operations

src/json-crdt/log/__tests__/Log.spec.ts

Lines changed: 111 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -144,45 +144,63 @@ describe('.findMax()', () => {
144144
});
145145
});
146146

147-
describe('.clone()', () => {
148-
const setup = () => {
149-
const model = Model.create({foo: 'bar'});
150-
const log1 = Log.fromNewModel(model);
151-
log1.metadata = {time: 123};
152-
log1.end.api.obj([]).set({x: 1});
153-
log1.end.api.flush();
154-
log1.end.api.obj([]).set({y: 2});
155-
log1.end.api.flush();
156-
log1.end.api.obj([]).set({foo: 'baz'});
157-
log1.end.api.flush();
158-
const log2 = log1.clone();
159-
return {log1, log2};
160-
};
147+
const setupTwoLogs = () => {
148+
const model = Model.create({foo: 'bar'});
149+
const log1 = Log.fromNewModel(model);
150+
log1.metadata = {time: 123};
151+
log1.end.api.obj([]).set({x: 1});
152+
log1.end.api.flush();
153+
log1.end.api.obj([]).set({y: 2});
154+
log1.end.api.flush();
155+
log1.end.api.obj([]).set({foo: 'baz'});
156+
log1.end.api.flush();
157+
const log2 = log1.clone();
158+
return {log1, log2};
159+
};
161160

161+
const assertLogsEqual = (log1: Log<any, any>, log2: Log<any, any>) => {
162+
expect(log1.start()).not.toBe(log2.start());
163+
expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true);
164+
expect(log1.start().clock.sid).toEqual(log2.start().clock.sid);
165+
expect(log1.start().clock.time).toEqual(log2.start().clock.time);
166+
expect(log1.end).not.toBe(log2.end);
167+
expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true);
168+
expect(log1.end.clock.sid).toEqual(log2.end.clock.sid);
169+
expect(log1.end.clock.time).toEqual(log2.end.clock.time);
170+
expect(log1.metadata).not.toBe(log2.metadata);
171+
expect(deepEqual(log1.metadata, log2.metadata)).toBe(true);
172+
expect(log1.patches.size()).toBe(log2.patches.size());
173+
expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary());
174+
expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary());
175+
expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v);
176+
expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v);
177+
};
178+
179+
describe('.clone()', () => {
162180
test('start model has the same view and clock', () => {
163-
const {log1, log2} = setup();
181+
const {log1, log2} = setupTwoLogs();
164182
expect(log1.start()).not.toBe(log2.start());
165183
expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true);
166184
expect(log1.start().clock.sid).toEqual(log2.start().clock.sid);
167185
expect(log1.start().clock.time).toEqual(log2.start().clock.time);
168186
});
169187

170188
test('end model has the same view and clock', () => {
171-
const {log1, log2} = setup();
189+
const {log1, log2} = setupTwoLogs();
172190
expect(log1.end).not.toBe(log2.end);
173191
expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true);
174192
expect(log1.end.clock.sid).toEqual(log2.end.clock.sid);
175193
expect(log1.end.clock.time).toEqual(log2.end.clock.time);
176194
});
177195

178196
test('metadata is the same but has different identity', () => {
179-
const {log1, log2} = setup();
197+
const {log1, log2} = setupTwoLogs();
180198
expect(log1.metadata).not.toBe(log2.metadata);
181199
expect(deepEqual(log1.metadata, log2.metadata)).toBe(true);
182200
});
183201

184202
test('patch log is the same', () => {
185-
const {log1, log2} = setup();
203+
const {log1, log2} = setupTwoLogs();
186204
expect(log1.patches.size()).toBe(log2.patches.size());
187205
expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary());
188206
expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary());
@@ -191,7 +209,8 @@ describe('.clone()', () => {
191209
});
192210

193211
test('can evolve logs independently', () => {
194-
const {log1, log2} = setup();
212+
const {log1, log2} = setupTwoLogs();
213+
assertLogsEqual(log1, log2);
195214
log1.end.api.obj([]).set({a: 1});
196215
log1.end.api.flush();
197216
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1});
@@ -203,27 +222,79 @@ describe('.clone()', () => {
203222
});
204223
});
205224

206-
describe('.rebase()', () => {
207-
test('can advance the log from start', () => {
208-
const model = Model.create();
209-
const sid0 = model.clock.sid;
210-
const sid1 = Model.sid();
211-
model.api.set({foo: 'bar'});
212-
const log = Log.fromNewModel(model);
213-
log.end.api.obj([]).set({x: 1});
214-
const patch1 = log.end.api.flush();
215-
log.end.setSid(sid1);
216-
log.end.api.obj([]).set({y: 2});
217-
const patch2 = log.end.api.flush();
218-
log.end.setSid(sid0);
219-
log.end.api.obj([]).set({foo: 'baz'});
220-
const patch3 = log.end.api.flush();
221-
const found0 = log.findMax(sid0);
222-
const found1 = log.findMax(sid1);
223-
const found2 = log.findMax(12345);
224-
expect(found0).toBe(patch3);
225-
expect(found1).toBe(patch2);
226-
expect(found2).toBe(void 0);
225+
describe('.rebaseBatch()', () => {
226+
test('can rebase a concurrent batch onto another log', () => {
227+
const {log1, log2} = setupTwoLogs();
228+
log1.end.api.obj([]).set({a: 1});
229+
log2.end.api.obj([]).set({b: 2});
230+
const patch1 = log1.end.api.flush();
231+
const patch2 = log2.end.api.flush();
232+
expect(patch1.toBinary()).not.toEqual(patch2.toBinary());
233+
expect(patch1.getId()?.sid).toBe(patch2.getId()?.sid);
234+
expect(patch1.getId()?.time).toBe(patch2.getId()?.time);
235+
expect(patch1.span()).toEqual(patch2.span());
236+
const [patch3] = log1.rebaseBatch([patch2]);
237+
expect(patch1.toBinary()).not.toEqual(patch3.toBinary());
238+
expect(patch1.getId()?.sid).toBe(patch3.getId()?.sid);
239+
expect(patch1.getId()!.time + patch1.span()).toBe(patch3.getId()?.time);
240+
log1.end.applyPatch(patch3);
241+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1, b: 2});
242+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2});
243+
expect(() => assertLogsEqual(log1, log2)).toThrow();
244+
log2.reset(log1.clone());
245+
assertLogsEqual(log1, log2);
246+
});
247+
248+
test('can rebase a concurrent batch onto another log (multiple patches)', () => {
249+
const {log1, log2} = setupTwoLogs();
250+
log1.end.api.obj([]).set({a: 1});
251+
log2.end.api.obj([]).set({b: 2});
252+
log1.end.api.flush();
253+
const patch2 = log2.end.api.flush();
254+
log1.end.api.obj([]).set({a: 2});
255+
log2.end.api.obj([]).set({b: 3});
256+
log1.end.api.flush();
257+
const patch4 = log2.end.api.flush();
258+
log2.end.api.obj([]).set({b: 3});
259+
const patch5 = log2.end.api.flush();
260+
const batch2 = [patch2, patch4, patch5];
261+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2});
262+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
263+
const batch3 = log1.rebaseBatch(batch2);
264+
expect(batch3[0].getId()!.time).toBe(log1.end.clock.time);
265+
log1.end.applyBatch(batch3);
266+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3});
267+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
268+
expect(() => assertLogsEqual(log1, log2)).toThrow();
269+
log2.reset(log1.clone());
270+
assertLogsEqual(log1, log2);
271+
});
272+
273+
test('can specify rebase sid', () => {
274+
const {log1, log2} = setupTwoLogs();
275+
expect(log1.end.clock.sid).toBe(log2.end.clock.sid);
276+
log1.end.api.obj([]).set({a: 1});
277+
log2.end.api.obj([]).set({b: 2});
278+
log1.end.api.flush();
279+
const patch2 = log2.end.api.flush();
280+
log1.end.setSid(12345);
281+
log1.end.api.obj([]).set({a: 2});
282+
log2.end.api.obj([]).set({b: 3});
283+
log1.end.api.flush();
284+
const patch4 = log2.end.api.flush();
285+
log2.end.api.obj([]).set({b: 3});
286+
const patch5 = log2.end.api.flush();
287+
const batch2 = [patch2, patch4, patch5];
288+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2});
289+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
290+
const batch3 = log1.rebaseBatch(batch2, log2.end.clock.sid);
291+
expect(batch3[0].getId()!.time).not.toBe(log1.end.clock.time);
292+
log1.end.applyBatch(batch3);
293+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3});
294+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
295+
expect(() => assertLogsEqual(log1, log2)).toThrow();
296+
log2.reset(log1.clone());
297+
assertLogsEqual(log1, log2);
227298
});
228299
});
229300

0 commit comments

Comments
 (0)