Skip to content

Conversation

@VeskeR
Copy link
Contributor

@VeskeR VeskeR commented Oct 9, 2025

Resolves PUB-2062

Summary by CodeRabbit

  • New Features

    • Added experimental async-iterator subscriptions for Objects and Instances via subscribeIterator, enabling consumption of updates with for await...of.
    • Path-based subscriptions honor existing options (including depth) and support multiple concurrent iterators with clean teardown.
    • Introduced a utility to convert listener-based subscriptions into async iterables.
  • Tests

    • Expanded test coverage for async-iterator subscriptions, concurrency, lifecycle, and error scenarios.

@coderabbitai
Copy link

coderabbitai bot commented Oct 9, 2025

Walkthrough

Adds async-iterator subscription APIs and a utility to convert listener callbacks into AsyncIterableIterators; updates PathObject and Instance implementations, type declarations, private API tracking, and tests covering iteration, depth, concurrency, cleanup, and error cases.

Changes

Cohort / File(s) Summary of changes
Type declarations
ably.d.ts
Added subscribeIterator(options?: PathObjectSubscriptionOptions) to PathObjectBase<_T extends Value> and subscribeIterator() to InstanceBase<T extends Value> returning AsyncIterableIterator of subscription events.
Iterator utility
src/common/lib/util/utils.ts
Added RegisterListenerFunction<T> type and listenerToAsyncIterator<T>(registerListener) which converts a listener-based registration into an AsyncIterableIterator with queuing, concurrency protection, and cleanup/unregister on termination.
Objects — Instance
src/plugins/objects/instance.ts
Implemented DefaultInstance.subscribeIterator() that validates the instance is a LiveObject, and returns an async iterator by wrapping existing subscribe(listener) via Utils.listenerToAsyncIterator.
Objects — PathObject
src/plugins/objects/pathobject.ts
Implemented DefaultPathObject.subscribeIterator(options?) which delegates to existing subscribe(listener, options) and wraps the returned unsubscribe in listenerToAsyncIterator.
Tests — Realtime objects
test/realtime/objects.test.js
Added tests covering PathObject.subscribeIterator() and DefaultInstance.subscribeIterator() for event yields, depth control, concurrent iterators, cleanup/unsubscribe behavior, and error handling for non-LiveObject values.
Tests — Private API recorder
test/common/modules/private_api_recorder.js
Extended tracked private API identifiers (e.g., call.EventEmitter.listeners, call.RealtimeObject.getPathObjectSubscriptionRegister, read.DefaultInstance._value, read.LiveMap._dataRef.data, read.LiveObject._instanceSubscriptions, read.PathObjectSubscriptionRegister._subscriptions) and adjusted ordering.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Dev as Developer Code
  participant Obj as PathObject / Instance
  participant Util as Utils.listenerToAsyncIterator
  participant Sub as subscribe(listener)
  participant Iter as AsyncIterator

  Dev->>Obj: subscribeIterator(options?)
  Obj->>Util: listenerToAsyncIterator(registerListener)
  Util->>Sub: register listener (subscribe)
  Sub-->>Util: returns unsubscribe()
  Util-->>Dev: AsyncIterableIterator (Iter)

  note over Sub,Util: On updates
  Sub-->>Util: event
  Util-->>Iter: enqueue / resolve next()

  rect rgba(220,240,255,0.4)
  Dev->>Iter: for await (const e of iterator)
  Iter-->>Dev: events (subscription updates)
  end

  alt Iterator closed / break
    Dev-->>Iter: break / return()
    Util->>Sub: call unsubscribe()
  else Error / termination
    Iter-->>Dev: error
    Util->>Sub: unsubscribe() on cleanup
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Files to pay extra attention to:
    • src/common/lib/util/utils.ts — correctness of async generator queueing, concurrency guard, and cleanup logic.
    • src/plugins/objects/instance.ts & src/plugins/objects/pathobject.ts — correct wiring of subscribe/unsubscribe and error handling for non-LiveObject cases.
    • test/realtime/objects.test.js — ensure tests correctly exercise iterator cancellation and concurrency.

Poem

I twitch my nose at streams that flow,
Events hop in lines I know.
For-await I nibble, event by bite,
Depth and breaks, all tidy and light.
A rabbit's cheer — async iterator delight. 🐇✨

Pre-merge checks and finishing touches

✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: implementing async iterator subscriptions for LiveObjects, which is the primary focus of all code modifications.
Linked Issues check ✅ Passed The PR implements async iterator-based subscriptions API for LiveObjects by adding subscribeIterator methods to PathObject and Instance interfaces, along with supporting utility functions.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing the async iterator subscriptions API; no unrelated modifications detected. Private API recorder updates and tests are necessary supporting changes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch PUB-2062/subscription-async-iterator

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.7)
test/realtime/objects.test.js

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot temporarily deployed to staging/pull/2101/features October 9, 2025 08:06 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/2101/bundle-report October 9, 2025 08:06 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/2101/typedoc October 9, 2025 08:06 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
src/common/lib/util/utils.ts (1)

481-532: Solid async-iterator helper; consider small ergonomics improvements

  • Document that concurrent next() is unsupported and that events are buffered without backpressure.
  • Optionally accept an AbortSignal and/or maxQueue to allow caller-controlled cancellation/backpressure.

Apply this minimal doc tweak to set expectations:

-/**
- * Converts a listener-based event emitter API into an async iterator
+/**
+ * Converts a listener-based event emitter API into an async iterator
  * that can be consumed using a `for await...of` loop.
  *
  * @param registerListener - A function that registers a listener and returns a function to remove it
  * @returns An async iterator that yields events from the listener
+ *
+ * Notes:
+ * - Concurrent `next()` calls are not supported; a single consumer is expected.
+ * - Events are buffered in-memory with no max size; consumers should iterate promptly to avoid growth.
  */

Please also confirm that BaseClient.Utils surfaces listenerToAsyncIterator everywhere it’s referenced (e.g., in modular/bundled builds).

ably.d.ts (2)

2517-2529: Clarify iterator semantics in docs

Add a brief note on concurrency/backpressure to set expectations.

  /**
   * Registers a subscription listener and returns an async iterator that yields
   * subscription events each time the object or a primitive value at this path is updated.
   *
   * This method functions in the same way as the regular {@link PathObjectBase.subscribe | PathObject.subscribe()} method,
   * but instead returns an async iterator that can be used in a `for await...of` loop for convenience.
+  *
+  * Notes:
+  * - Only a single consumer is supported (no concurrent `next()` calls).
+  * - Events are buffered in-memory without a max size; iterate promptly to avoid growth.
+  * - Exiting the loop (e.g., `break`) cancels the subscription.
   *
   * @param options - Optional subscription configuration.
   * @returns An async iterator that yields {@link PathObjectSubscriptionEvent} objects.
   * @experimental
   */

2940-2951: Mirror iterator semantics note for Instance.subscribeIterator

Same concise note as PathObject to avoid surprises for users.

  /**
   * Registers a subscription listener and returns an async iterator that yields
   * subscription events each time this instance is updated.
   *
   * This method functions in the same way as the regular {@link InstanceBase.subscribe | Instance.subscribe()} method,
   * but instead returns an async iterator that can be used in a `for await...of` loop for convenience.
+  *
+  * Notes:
+  * - Only a single consumer is supported (no concurrent `next()` calls).
+  * - Events are buffered in-memory without a max size; iterate promptly to avoid growth.
+  * - Exiting the loop (e.g., `break`) cancels the subscription.
   *
   * @returns An async iterator that yields {@link InstanceSubscriptionEvent} objects.
   * @experimental
   */
test/realtime/objects.test.js (3)

5108-5141: Avoid race when asserting active subscription count

The assertion reading _subscriptions.size immediately after starting the iterator can race with async listener registration. Add a nextTick before asserting to reduce flakiness.

Apply this diff:

             const iteratorPromise = (async () => {
               for await (const _ of entryPathObject.subscribeIterator()) {
                 eventCount++;
                 if (eventCount >= 2) break;
               }
             })();

+            // ensure registration tick completes before asserting
+            helper.recordPrivateApi('call.Platform.nextTick');
+            await new Promise((res) => nextTick(res));
+
             helper.recordPrivateApi('call.RealtimeObject.getPathObjectSubscriptionRegister');
             helper.recordPrivateApi('read.PathObjectSubscriptionRegister._subscriptions');
             expect(realtimeObject.getPathObjectSubscriptionRegister()._subscriptions.size).to.equal(
               1,
               'Check one active subscription',
             );

6126-6160: Stabilize listener-count assertion for DefaultInstance iterator

Same race risk as the PathObject case. Insert a nextTick before reading internal listeners to avoid intermittent failures.

Apply this diff:

             const iteratorPromise = (async () => {
               for await (const _ of instance.subscribeIterator()) {
                 eventCount++;
                 if (eventCount >= 2) break;
               }
             })();

-            expect(registeredListeners(instance).length).to.equal(1, 'Check one active listener');
+            // ensure registration completes before asserting
+            helper.recordPrivateApi('call.Platform.nextTick');
+            await new Promise((res) => nextTick(res));
+            expect(registeredListeners(instance).length).to.equal(1, 'Check one active listener');

6208-6223: Make “throws on non‑LiveObject” robust to deferred validation

If subscribeIterator() validates at iteration time (not at call), the current sync expectation could miss the failure. Consume one iteration in an async wrapper so it fails in both cases.

Apply this diff:

-            expect(() => {
-              primitiveInstance.subscribeIterator();
-            }).to.throw('Cannot subscribe to a non-LiveObject instance');
+            await expectToThrowAsync(
+              async () => {
+                // Attempt to iterate once; covers both sync and deferred validation
+                for await (const _ of primitiveInstance.subscribeIterator()) {
+                  break;
+                }
+              },
+              'Cannot subscribe to a non-LiveObject instance',
+            );
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between eb94121 and 7465b0b.

📒 Files selected for processing (6)
  • ably.d.ts (2 hunks)
  • src/common/lib/util/utils.ts (1 hunks)
  • src/plugins/objects/instance.ts (1 hunks)
  • src/plugins/objects/pathobject.ts (1 hunks)
  • test/common/modules/private_api_recorder.js (3 hunks)
  • test/realtime/objects.test.js (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
src/common/lib/util/utils.ts (1)
src/common/lib/types/errorinfo.ts (1)
  • ErrorInfo (35-66)
src/plugins/objects/pathobject.ts (1)
ably.d.ts (2)
  • PathObjectSubscriptionOptions (3155-3165)
  • PathObjectSubscriptionEvent (3145-3150)
src/plugins/objects/instance.ts (2)
ably.d.ts (2)
  • InstanceSubscriptionEvent (3170-3175)
  • LiveObject (2451-2451)
src/plugins/objects/liveobject.ts (1)
  • unsubscribe (111-122)
test/realtime/objects.test.js (3)
src/plugins/objects/livemap.ts (1)
  • LiveMap (52-1131)
src/plugins/objects/livecounter.ts (1)
  • LiveCounter (23-374)
src/plugins/objects/pathobject.ts (1)
  • instance (131-150)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: test-browser (webkit)
  • GitHub Check: test-browser (firefox)
  • GitHub Check: test-browser (chromium)
  • GitHub Check: test-node (20.x)
  • GitHub Check: test-node (16.x)
  • GitHub Check: test-node (18.x)
🔇 Additional comments (6)
src/plugins/objects/instance.ts (1)

162-170: subscribeIterator implementation looks good

Type guard, wrapping, and teardown via returned unsubscribe are correct.

test/common/modules/private_api_recorder.js (1)

17-17: Private API surface updates align with new functionality

Additions look correct and consistent with new iterator/subscription internals.

Also applies to: 32-32, 82-88

src/plugins/objects/pathobject.ts (1)

287-293: subscribeIterator correctly delegates and cleans up

Options are forwarded; unsubscribe is returned for teardown. Looks good.

test/realtime/objects.test.js (3)

5030-5056: Good coverage of basic async iteration on PathObject

Iterates, collects, and breaks predictably; validates event shape. Looks solid.


5060-5101: Depth filtering behavior verified clearly

Depth=1 excludes nested updates and asserts payload shape; this is precise and valuable.


5148-5184: Concurrent iterators scenario is clear and correct

Two independent consumers, distinct break conditions; asserts counts. Good.

Copy link
Contributor

@mschristensen mschristensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, LGTM

@VeskeR VeskeR force-pushed the PUB-2064/compact-representation branch 3 times, most recently from f3eee80 to b1d0234 Compare November 4, 2025 08:40
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
test/realtime/objects.test.js (5)

5171-5174: Reduce brittleness of payload assertions

Deep-equality on event.message.payload.update ties tests to exact shape. Prefer checking key-specific properties to allow forward-compatible payload extensions.

Apply this minimal change:

-                expect(event.message.payload.update).to.deep.equal({ directKey: 'updated' }, 'Check event payload');
+                expect(event.message?.payload?.update)
+                  .to.have.property('directKey', 'updated', 'Check event payload');

5217-5243: Avoid hard-coupling to private subscription registry

Asserting realtimeObject.getPathObjectSubscriptionRegister()._subscriptions.size is brittle against internal refactors. You already validate no further events are yielded; rely on that, or guard the private check behind an existence test.

Example:

-            expect(realtimeObject.getPathObjectSubscriptionRegister()._subscriptions.size).to.equal(
-              1,
-              'Check one active subscription',
-            );
+            const reg = realtimeObject.getPathObjectSubscriptionRegister?.();
+            if (reg?._subscriptions) {
+              expect(reg._subscriptions.size).to.equal(1, 'Check one active subscription');
+            }

6108-6115: Mirror the less‑brittle payload check for DefaultInstance

Same note as for PathObject: assert properties rather than strict object equality to decouple from payload growth.

-                expect(event.message.payload.update).to.deep.equal(
-                  { amount: events.length === 0 ? 1 : -2 },
-                  'Check event message payload',
-                );
+                expect(event.message?.payload?.update?.amount)
+                  .to.equal(events.length === 0 ? 1 : -2, 'Check event message payload');

6181-6186: Minimize reliance on internal EventEmitter state

Accessing instance._value._instanceSubscriptions.listeners('updated') risks breakage with internal changes. Prefer black‑box validation (no more events after break) or feature-detect before asserting.

-            expect(registeredListeners(instance).length).to.equal(1, 'Check one active listener');
+            const listeners = registeredListeners?.(instance) ?? [];
+            if (Array.isArray(listeners)) {
+              expect(listeners.length).to.equal(1, 'Check one active listener');
+            }-            expect(registeredListeners(instance)?.length ?? 0).to.equal(
-              0,
-              'Check no active listeners after breaking out of iterator',
-            );
+            const remaining = registeredListeners?.(instance) ?? [];
+            if (Array.isArray(remaining)) {
+              expect(remaining.length).to.equal(0, 'Check no active listeners after breaking out of iterator');
+            }

Also applies to: 6198-6199


6275-6278: Assert error code for non‑LiveObject subscribeIterator

Other tests assert code 92007; add it here for consistency and stronger guarantee.

-            expect(() => {
-              primitiveInstance.subscribeIterator();
-            }).to.throw('Cannot subscribe to a non-LiveObject instance');
+            expect(() => {
+              primitiveInstance.subscribeIterator();
+            })
+              .to.throw('Cannot subscribe to a non-LiveObject instance')
+              .with.property('code', 92007);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 7465b0b and 3caec7a.

📒 Files selected for processing (6)
  • ably.d.ts (2 hunks)
  • src/common/lib/util/utils.ts (1 hunks)
  • src/plugins/objects/instance.ts (1 hunks)
  • src/plugins/objects/pathobject.ts (1 hunks)
  • test/common/modules/private_api_recorder.js (3 hunks)
  • test/realtime/objects.test.js (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/plugins/objects/pathobject.ts
  • src/common/lib/util/utils.ts
  • src/plugins/objects/instance.ts
  • ably.d.ts
🧰 Additional context used
🧬 Code graph analysis (1)
test/realtime/objects.test.js (4)
src/plugins/objects/livemap.ts (1)
  • LiveMap (49-1038)
objects.d.ts (2)
  • LiveMap (16-28)
  • LiveCounter (33-43)
src/plugins/objects/livecounter.ts (1)
  • LiveCounter (15-301)
src/plugins/objects/pathobject.ts (1)
  • instance (182-201)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: test-node (20.x)
  • GitHub Check: test-node (16.x)
  • GitHub Check: test-node (18.x)
  • GitHub Check: test-browser (webkit)
  • GitHub Check: test-browser (firefox)
  • GitHub Check: test-browser (chromium)
🔇 Additional comments (3)
test/common/modules/private_api_recorder.js (2)

17-17: LGTM! New subscription-related call identifiers added correctly.

The new call.EventEmitter.listeners and call.RealtimeObject.getPathObjectSubscriptionRegister identifiers are properly sorted and formatted, aligning with the PR's subscription functionality.

Also applies to: 32-32


82-87: Partial verification successful; manual review recommended for three identifiers.

The first verification confirmed three of the six identifiers in the snippet are correctly tracked and used:

  • read.DefaultInstance._value ✓ used at line 6182
  • read.LiveObject._instanceSubscriptions ✓ used at line 6183
  • read.PathObjectSubscriptionRegister._subscriptions ✓ used at lines 5218, 5237

However, three identifiers (read.Defaults.version, read.EventEmitter.events, read.LiveMap._dataRef.data) were not found in the filtered search. These could be added preemptively for future tests or used in different patterns. Verify they are necessary or intentionally reserved.

test/realtime/objects.test.js (1)

5247-5285: Concurrent iterators coverage looks solid

Good independent-iterator validation with deterministic break conditions. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants